Middleware 管道系统分析
概述
Microsoft Agent Framework 的中间件系统采用三级分层架构,在 Agent、Function、Chat 三个层面提供拦截和处理能力。这种设计借鉴了 Web 框架(如 Express、Koa)的中间件模式,但针对 AI 应用的特殊需求进行了深度定制。
核心设计哲学:
- 分层拦截:在数据流转的关键节点设置拦截点
- 统一上下文:通过 Context 对象传递状态和元数据
- 链式执行:责任链模式构建可插拔的处理流程
- 流式支持:原生支持 Streaming 场景的特殊处理
┌─────────────────────────────────────────────────────────────────┐
│ Middleware Pipeline Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │
│ │Middleware│───▶│Middleware│───▶│Middleware│───▶│Final Handler│ │
│ │ #1 │ │ #2 │ │ #3 │ │ (Agent) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └──────┬──────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Shared Context Object │ │
│ │ • messages • metadata • result • terminate │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Context 对象设计
为什么需要三种 Context?
三种 Context 分别对应系统架构中的三个不同抽象层次:
┌─────────────────────────────────────────────────────────────────┐
│ Three-Layer Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Agent Layer (AgentRunContext) │ │
│ │ ├── Agent 配置、指令、工具集成 │ │
│ │ ├── 线程管理 (AgentThread) │ │
│ │ └── 高层业务逻辑编排 │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Function Layer (FunctionInvocationContext) │ │
│ │ ├── 工具函数调用 │ │
│ │ ├── 参数验证 (Pydantic Model) │ │
│ │ └── 函数级缓存/日志/权限控制 │ │
│ └───────────────────────────────────────────────────────────┘ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Chat Layer (ChatContext) │ │
│ │ ├── 原始 LLM 调用 │ │
│ │ ├── 消息格式转换 │ │
│ │ └── 模型参数 (temperature, max_tokens...) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
1. AgentRunContext(第 61-137 行)
class AgentRunContext(SerializationMixin):
INJECTABLE: ClassVar[set[str]] = {"agent", "thread", "result"}
def __init__(
self,
agent: "AgentProtocol",
messages: list[ChatMessage],
thread: "AgentThread | None" = None,
is_streaming: bool = False,
metadata: dict[str, Any] | None = None,
result: AgentResponse | AsyncIterable[AgentResponseUpdate] | None = None,
terminate: bool = False,
kwargs: dict[str, Any] | None = None,
) -> None:
设计特点:
- INJECTABLE 标记:支持依赖注入的字段(agent/thread/result 可被序列化)
- 双模式支持:result 可以是
AgentResponse(非流式)或AsyncIterable[AgentResponseUpdate](流式) - 线程追踪:包含
AgentThread用于状态持久化
2. FunctionInvocationContext(第 139-203 行)
class FunctionInvocationContext(SerializationMixin):
INJECTABLE: ClassVar[set[str]] = {"function", "arguments", "result"}
def __init__(
self,
function: "FunctionTool[Any, Any]",
arguments: "BaseModel", # Pydantic 验证后的参数
metadata: dict[str, Any] | None = None,
result: Any = None,
terminate: bool = False,
kwargs: dict[str, Any] | None = None,
) -> None:
设计特点:
- 强类型参数:
arguments是 Pydantic BaseModel,已验证类型安全 - 函数级粒度:针对单个工具函数的调用拦截
- 轻量级:不包含线程/流式等复杂状态
3. ChatContext(第 205-281 行)
class ChatContext(SerializationMixin):
INJECTABLE: ClassVar[set[str]] = {"chat_client", "result"}
def __init__(
self,
chat_client: "ChatClientProtocol",
messages: "MutableSequence[ChatMessage]", # 可修改的消息列表
options: Mapping[str, Any] | None, # LLM 参数
is_streaming: bool = False,
metadata: dict[str, Any] | None = None,
result: "ChatResponse | AsyncIterable[ChatResponseUpdate] | None" = None,
terminate: bool = False,
kwargs: dict[str, Any] | None = None,
) -> None:
设计特点:
- 可修改消息:
MutableSequence允许中间件修改消息列表(如添加系统提示) - Options 透传:LLM 特定参数(temperature, model_id 等)
- 客户端解耦:通过
ChatClientProtocol抽象不同 LLM 提供商
Context 对比表
| 特性 | AgentRunContext | FunctionInvocationContext | ChatContext |
|---|---|---|---|
| 核心对象 | agent | function | chat_client |
| 数据载体 | messages | arguments (BaseModel) | messages (Mutable) |
| 状态管理 | thread | - | options |
| 典型用途 | 业务编排 | 参数验证/缓存 | 模型调用/重试 |
Middleware 抽象层
1. 抽象基类设计(第 283-469 行)
class AgentMiddleware(ABC):
@abstractmethod
async def process(
self,
context: AgentRunContext,
next: Callable[[AgentRunContext], Awaitable[None]],
) -> None:
"""
关键设计:不返回值,所有数据通过 context 流转
- 设置 context.result 可覆盖执行结果
- 调用 next(context) 继续执行链
"""
统一签名模式:
async def process(self, context: TContext, next: NextFn) -> None
# 其中 NextFn = Callable[[TContext], Awaitable[None]]
这种设计的精妙之处:
- 无返回值:强制所有副作用通过 Context 传递
- 洋葱模型:
await next(context)将控制权交给下一个中间件 - 双向拦截:可在
next()前后分别处理(前置/后置逻辑)
2. 纯函数中间件支持(第 472-609 行)
# 类型别名定义简洁的函数签名
AgentMiddlewareCallable = Callable[
[AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]],
Awaitable[None]
]
class MiddlewareWrapper(Generic[TContext]):
"""将纯函数包装为类协议"""
def __init__(self, func: Callable[..., Awaitable[None]]) -> None:
self.func = func
async def process(self, context: T, next: NextFn) -> None:
await self.func(context, next)
为什么要支持纯函数?
# 类方式(有状态)
class LoggingMiddleware(AgentMiddleware):
def __init__(self, logger):
self.logger = logger
async def process(self, context, next):
self.logger.info(f"Agent: {context.agent.name}")
await next(context)
# 函数方式(无状态/简洁)
@agent_middleware
async def logging_middleware(context: AgentRunContext, next):
print(f"Before: {context.agent.name}")
await next(context)
print(f"After: {context.result}")
3. 装饰器系统(第 495-591 行)
def agent_middleware(func: AgentMiddlewareCallable) -> AgentMiddlewareCallable:
# 通过属性标记中间件类型,用于自动分类
func._middleware_type: MiddlewareType = MiddlewareType.AGENT
return func
def function_middleware(func: FunctionMiddlewareCallable) -> FunctionMiddlewareCallable:
func._middleware_type: MiddlewareType = MiddlewareType.FUNCTION
return func
def chat_middleware(func: ChatMiddlewareCallable) -> ChatMiddlewareCallable:
func._middleware_type: MiddlewareType = MiddlewareType.CHAT
return func
Pipeline 执行机制
1. 链式构建核心算法(第 659-745 行)
def _create_handler_chain(
self,
final_handler: Callable[[Any], Awaitable[Any]],
result_container: dict[str, Any],
result_key: str = "result",
) -> Callable[[Any], Awaitable[None]]:
"""
递归构建洋葱式调用链
核心思想:
- 从最后一个中间件向前构建
- 每个中间件包装下一个处理函数
- 最终包装实际业务逻辑
"""
def create_next_handler(index: int) -> Callable[[Any], Awaitable[None]]:
if index >= len(self._middleware):
# 链尾:实际执行逻辑
async def final_wrapper(c: Any) -> None:
result = await final_handler(c)
result_container[result_key] = result
c.result = result # 结果回写 Context
return final_wrapper
# 递归构建:当前中间件包装下一个 handler
middleware = self._middleware[index]
next_handler = create_next_handler(index + 1) # 递归!
async def current_handler(c: Any) -> None:
await middleware.process(c, next_handler)
return current_handler
return create_next_handler(0) # 从第一个中间件开始
可视化执行流程:
┌─────────────────────────────────────────────────────────────────────────┐
│ Handler Chain Construction │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 构建阶段(递归,从内到外): │
│ │
│ create_handler(0) │
│ └── Middleware#1.process(ctx, create_handler(1)) │
│ └── Middleware#2.process(ctx, create_handler(2)) │
│ └── Middleware#3.process(ctx, create_handler(3)) │
│ └── final_wrapper(ctx) ← 实际执行 │
│ │
│ 执行阶段(调用,从外到内): │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Middleware#1: before logic │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ Middleware#2: before logic │ │ │
│ │ │ ┌─────────────────────────────────────────────────┐ │ │ │
│ │ │ │ Middleware#3: before logic │ │ │ │
│ │ │ │ ┌─────────────────────────────────────────┐ │ │ │ │
│ │ │ │ │ Final Handler: 执行业务逻辑 │ │ │ │ │
│ │ │ │ │ result = await actual_execution(ctx) │ │ │ │ │
│ │ │ │ └─────────────────────────────────────────┘ │ │ │ │
│ │ │ │ Middleware#3: after logic │ │ │ │
│ │ │ └─────────────────────────────────────────────────┘ │ │ │
│ │ │ Middleware#2: after logic │ │ │
│ │ └─────────────────────────────────────────────────────────┘ │ │
│ │ Middleware#1: after logic │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ◄── 时间流向 │
│ │
└─────────────────────────────────────────────────────────────────────────┘
2. Streaming 特殊处理(第 697-745 行)
def _create_streaming_handler_chain(
self,
final_handler: Callable[[Any], Any], # 返回 AsyncIterable
result_container: dict[str, Any],
result_key: str = "result_stream",
) -> Callable[[Any], Awaitable[None]]:
def create_next_handler(index: int) -> Callable[[Any], Awaitable[None]]:
if index >= len(self._middleware):
async def final_wrapper(c: Any) -> None:
if c.terminate:
return # 提前终止
# 处理非协程情况(如生成器函数)
try:
result = await final_handler(c)
except TypeError:
result = final_handler(c) # 同步生成器
result_container[result_key] = result
c.result = result
return final_wrapper
middleware = self._middleware[index]
next_handler = create_next_handler(index + 1)
async def current_handler(c: Any) -> None:
await middleware.process(c, next_handler)
if c.terminate: # 终止检查
return
return current_handler
流式 vs 非流式差异:
| 场景 | 返回值类型 | 终止检查位置 | 特殊处理 |
|---|---|---|---|
| 非流式 | AgentResponse | agent_final_handler | 直接返回 |
| 流式 | AsyncIterable[Update] | 每个中间件后 | try/except TypeError |
3. Pipeline 执行入口
# Agent 非流式执行(第 776-826 行)
async def execute(
self,
agent: "AgentProtocol",
messages: list[ChatMessage],
context: AgentRunContext,
final_handler: Callable[[AgentRunContext], Awaitable[AgentResponse]],
) -> AgentResponse | None:
# 1. 更新 Context
context.agent = agent
context.messages = messages
context.is_streaming = False
# 2. 短路:无中间件时直接执行
if not self._middleware:
return await final_handler(context)
# 3. 构建并执行链
result_container = {"result": None}
first_handler = self._create_handler_chain(agent_final_handler, result_container, "result")
await first_handler(context)
# 4. 结果处理(支持 override)
if context.result is not None:
return context.result # 中间件覆盖的结果
return result_container.get("result") or AgentResponse()
类装饰器实现
1. use_agent_middleware 装饰器(第 1153-1314 行)
这是系统最核心的装饰器,为 Agent 类注入中间件能力:
def use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]:
# 1. 保存原始方法
original_run = agent_class.run
original_run_stream = agent_class.run_stream
def _build_middleware_pipelines(
agent_level_middlewares: Sequence[Middleware] | None,
run_level_middlewares: Sequence[Middleware] | None = None,
) -> tuple[AgentMiddlewarePipeline, FunctionMiddlewarePipeline, list[ChatMiddleware]]:
"""
双层中间件架构:
- Agent 级:实例化时注册,全局生效
- Run 级:每次调用时传入,仅本次生效
"""
middleware = categorize_middleware(
*(agent_level_middlewares or ()),
*(run_level_middlewares or ())
)
return (
AgentMiddlewarePipeline(middleware["agent"]),
FunctionMiddlewarePipeline(middleware["function"]),
middleware["chat"],
)
async def middleware_enabled_run(self, messages, *, thread=None, middleware=None, **kwargs):
"""被装饰后的 run 方法"""
# 获取实例级中间件
agent_middleware = getattr(self, "middleware", None)
# 构建管道(合并实例级 + 调用级)
agent_pipeline, function_pipeline, chat_middlewares = _build_middleware_pipelines(
agent_middleware, middleware
)
# 传递 Function 中间件到下层(通过 kwargs 隐式传递)
if function_pipeline.has_middlewares:
kwargs["_function_middleware_pipeline"] = function_pipeline
# 传递 Chat 中间件到下层
if chat_middlewares:
kwargs["middleware"] = chat_middlewares
# 标准化消息格式
normalized_messages = normalize_messages(messages)
# 执行中间件管道或短路
if agent_pipeline.has_middlewares:
context = AgentRunContext(
agent=self,
messages=normalized_messages,
thread=thread,
is_streaming=False,
kwargs=kwargs,
)
async def _execute_handler(ctx):
return await original_run(self, ctx.messages, thread=thread, **ctx.kwargs)
return await agent_pipeline.execute(self, normalized_messages, context, _execute_handler)
# 无中间件:直接调用原始方法
return await original_run(self, normalized_messages, thread=thread, **kwargs)
关键机制:
┌─────────────────────────────────────────────────────────────────────┐
│ Middleware Propagation Chain │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Agent.run(messages, middleware=[...]) │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ use_agent_middleware 装饰器 │ │
│ │ 1. 分类中间件 │ │
│ │ - AgentMiddleware → AgentPipeline │ │
│ │ - FunctionMiddleware ─┐ │ │
│ │ - ChatMiddleware ─────┼──┐ │ │
│ └──────────────────────────┼───┼────────────┘ │
│ │ │ │
│ │ │ 通过 kwargs 传递 │
│ ▼ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ ChatClient.get_response() │ │
│ │ 接收 kwargs["middleware"] │ │
│ │ 接收 kwargs["_function_middleware_pipeline"] │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ use_chat_middleware 装饰器 │ │
│ │ 进一步分类并构建各自 Pipeline │ │
│ └──────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
2. 中间件分类器(第 1491-1534 行)
def categorize_middleware(
*middleware_sources: Middleware | None,
) -> MiddlewareDict:
"""
智能分类逻辑:
1. 类实例:通过 isinstance 判断
2. 函数:通过 _middleware_type 标记或类型注解推断
"""
result: MiddlewareDict = {"agent": [], "function": [], "chat": []}
for middleware in all_middleware:
if isinstance(middleware, AgentMiddleware):
result["agent"].append(middleware)
elif isinstance(middleware, FunctionMiddleware):
result["function"].append(middleware)
elif isinstance(middleware, ChatMiddleware):
result["chat"].append(middleware)
elif callable(middleware):
# 函数类型:标记 > 注解 > 异常
middleware_type = _determine_middleware_type(middleware)
# ... 按类型分发
3. 类型推断算法(第 1083-1149 行)
def _determine_middleware_type(middleware: Any) -> MiddlewareType:
"""
双重验证机制:
1. 装饰器标记(显式)
2. 参数类型注解(隐式)
"""
decorator_type = getattr(middleware, "_middleware_type", None)
# 通过签名推断
sig = inspect.signature(middleware)
params = list(sig.parameters.values())
first_param = params[0]
if first_param.annotation.__name__ == "AgentRunContext":
param_type = MiddlewareType.AGENT
# ... 其他类型判断
# 双重验证:装饰器和注解必须一致
if decorator_type and param_type:
if decorator_type != param_type:
raise MiddlewareException("Type mismatch...")
return decorator_type
return decorator_type or param_type # 优先级:显式 > 隐式
终止机制
context.terminate 的使用场景
class ValidationMiddleware(FunctionMiddleware):
async def process(self, context: FunctionInvocationContext, next):
# 参数验证失败,直接返回错误,不执行实际函数
if not self.validate(context.arguments):
context.result = {"error": "Validation failed"}
context.terminate = True # 终止后续执行
return # 不调用 next()
await next(context)
终止检查点:
┌─────────────────────────────────────────────────────────────────┐
│ Termination Checkpoints │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 非流式执行流程: │
│ ───────────── │
│ Middleware#1.process() │
│ │ │
│ ├── [可设置 terminate=True] │
│ ▼ │
│ Middleware#2.process() │
│ │ │
│ ▼ │
│ agent_final_handler() ← 检查 terminate │
│ │ if c.terminate: return c.result or empty │
│ ▼ │
│ 实际业务执行 │
│ │
│ 流式执行流程: │
│ ──────────── │
│ Middleware#1.process() │
│ │ │
│ ├── [设置 terminate=True] │
│ ├── await next(context) │
│ │ │
│ ▼ ← 每个中间件后检查 │
│ if c.terminate: return # 立即退出 │
│ │
│ 【关键差异】 │
│ • 非流式:仅在 final_handler 前检查一次 │
│ • 流式:每个中间件执行后都检查,支持快速短路 │
│ │
└─────────────────────────────────────────────────────────────────┘
深挖价值点
1. 设计亮点
A. 递归链式构建
# 优雅的递归构建洋葱模型
next_handler = create_next_handler(index + 1) # 递归构建内层
async def current_handler(c):
await middleware.process(c, next_handler) # 包装当前层
B. Context 作为数据总线
- 所有数据(输入、输出、元数据)通过 Context 流转
- 支持中间件覆盖结果(短路模式)
- 支持后置观察(日志/监控)
C. 双层中间件架构
- Agent 级:全局配置,实例复用
- Run 级:动态注入,单次生效
- 通过
categorize_middleware智能合并
D. 流式原生支持
_create_streaming_handler_chain专门处理异步迭代器try/except TypeError兼容同步生成器- 每个中间件后可终止,避免无效流处理
2. 是否值得深挖链式执行机制?
结论:非常值得,原因如下:
架构核心:链式执行是整个中间件系统的核心机制,理解它就能理解整个框架的数据流转
面试高频:这种洋葱模型/责任链模式是系统设计面试的经典题目
性能敏感点:
- 递归构建链的复杂度 O(n)
- 每个请求都重新构建(是否有优化空间?)
- 流式场景的终止检查开销
可扩展性:
- 如何支持条件中间件?(特定路由才启用)
- 如何支持并行中间件?(无依赖的可并发执行)
- 如何支持错误恢复?(当前无 try/catch 包装)
建议深挖方向:
┌─────────────────────────────────────────────────────────────────┐
│ Potential Deep Dive Areas │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 性能优化: │
│ • 中间件链缓存(相同配置复用 Handler Chain) │
│ • 延迟构建(确定有中间件时才构建) │
│ │
│ 2. 错误处理: │
│ • 当前链中任一中间件异常会导致整个链失败 │
│ • 是否需要 try/catch 包装? │
│ │
│ 3. 条件执行: │
│ • 基于 Context 的条件判断(如:仅对特定 Agent 启用) │
│ │
│ 4. 并行中间件: │
│ • 无依赖的中间件并行执行(如:日志 + 统计) │
│ │
│ 5. 类型安全: │
│ • 当前大量使用 Any,是否可用泛型加强约束 │
│ │
└─────────────────────────────────────────────────────────────────┘
3. 源码精华片段
链式构建的递归之美(第 676-695 行):
def create_next_handler(index: int) -> Callable[[Any], Awaitable[None]]:
if index >= len(self._middleware):
# 基准情况:最终处理器
async def final_wrapper(c: Any) -> None:
result = await final_handler(c)
result_container[result_key] = result
c.result = result
return final_wrapper
# 递归步骤:当前中间件包装下一个
middleware = self._middleware[index]
next_handler = create_next_handler(index + 1) # 递归调用
async def current_handler(c: Any) -> None:
await middleware.process(c, next_handler)
return current_handler
return create_next_handler(0) # 从链头开始
双层中间件合并逻辑(第 1193-1209 行):
def _build_middleware_pipelines(
agent_level_middlewares: Sequence[Middleware] | None,
run_level_middlewares: Sequence[Middleware] | None = None,
) -> tuple[...]:
"""
设计巧思:使用 None 作为哨兵值,避免空列表的歧义
*(agent_level_middlewares or ()) # 解包语法确保参数展开正确
"""
middleware = categorize_middleware(
*(agent_level_middlewares or ()),
*(run_level_middlewares or ())
)
# ...