Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Middleware 管道系统分析

Middleware 管道系统分析

概述

Microsoft Agent Framework 的中间件系统采用三级分层架构,在 Agent、Function、Chat 三个层面提供拦截和处理能力。这种设计借鉴了 Web 框架(如 Express、Koa)的中间件模式,但针对 AI 应用的特殊需求进行了深度定制。

核心设计哲学:

  • 分层拦截:在数据流转的关键节点设置拦截点
  • 统一上下文:通过 Context 对象传递状态和元数据
  • 链式执行:责任链模式构建可插拔的处理流程
  • 流式支持:原生支持 Streaming 场景的特殊处理
┌─────────────────────────────────────────────────────────────────┐
│                     Middleware Pipeline Flow                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────────┐  │
│   │Middleware│───▶│Middleware│───▶│Middleware│───▶│Final Handler│  │
│   │   #1    │    │   #2    │    │   #3    │    │   (Agent)   │  │
│   └────┬────┘    └────┬────┘    └────┬────┘    └──────┬──────┘  │
│        │              │              │                 │        │
│        ▼              ▼              ▼                 ▼        │
│   ┌──────────────────────────────────────────────────────────┐  │
│   │              Shared Context Object                        │  │
│   │  • messages    • metadata    • result    • terminate     │  │
│   └──────────────────────────────────────────────────────────┘  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Context 对象设计

为什么需要三种 Context?

三种 Context 分别对应系统架构中的三个不同抽象层次:

┌─────────────────────────────────────────────────────────────────┐
│                    Three-Layer Architecture                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  Agent Layer (AgentRunContext)                             │  │
│  │  ├── Agent 配置、指令、工具集成                              │  │
│  │  ├── 线程管理 (AgentThread)                                 │  │
│  │  └── 高层业务逻辑编排                                        │  │
│  └───────────────────────────────────────────────────────────┘  │
│                           ▼                                      │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  Function Layer (FunctionInvocationContext)                │  │
│  │  ├── 工具函数调用                                           │  │
│  │  ├── 参数验证 (Pydantic Model)                              │  │
│  │  └── 函数级缓存/日志/权限控制                                 │  │
│  └───────────────────────────────────────────────────────────┘  │
│                           ▼                                      │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  Chat Layer (ChatContext)                                  │  │
│  │  ├── 原始 LLM 调用                                          │  │
│  │  ├── 消息格式转换                                           │  │
│  │  └── 模型参数 (temperature, max_tokens...)                  │  │
│  └───────────────────────────────────────────────────────────┘  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

1. AgentRunContext(第 61-137 行)

class AgentRunContext(SerializationMixin):
    INJECTABLE: ClassVar[set[str]] = {"agent", "thread", "result"}
    
    def __init__(
        self,
        agent: "AgentProtocol",
        messages: list[ChatMessage],
        thread: "AgentThread | None" = None,
        is_streaming: bool = False,
        metadata: dict[str, Any] | None = None,
        result: AgentResponse | AsyncIterable[AgentResponseUpdate] | None = None,
        terminate: bool = False,
        kwargs: dict[str, Any] | None = None,
    ) -> None:

设计特点:

  • INJECTABLE 标记:支持依赖注入的字段(agent/thread/result 可被序列化)
  • 双模式支持:result 可以是 AgentResponse(非流式)或 AsyncIterable[AgentResponseUpdate](流式)
  • 线程追踪:包含 AgentThread 用于状态持久化

2. FunctionInvocationContext(第 139-203 行)

class FunctionInvocationContext(SerializationMixin):
    INJECTABLE: ClassVar[set[str]] = {"function", "arguments", "result"}
    
    def __init__(
        self,
        function: "FunctionTool[Any, Any]",
        arguments: "BaseModel",  # Pydantic 验证后的参数
        metadata: dict[str, Any] | None = None,
        result: Any = None,
        terminate: bool = False,
        kwargs: dict[str, Any] | None = None,
    ) -> None:

设计特点:

  • 强类型参数:arguments 是 Pydantic BaseModel,已验证类型安全
  • 函数级粒度:针对单个工具函数的调用拦截
  • 轻量级:不包含线程/流式等复杂状态

3. ChatContext(第 205-281 行)

class ChatContext(SerializationMixin):
    INJECTABLE: ClassVar[set[str]] = {"chat_client", "result"}
    
    def __init__(
        self,
        chat_client: "ChatClientProtocol",
        messages: "MutableSequence[ChatMessage]",  # 可修改的消息列表
        options: Mapping[str, Any] | None,  # LLM 参数
        is_streaming: bool = False,
        metadata: dict[str, Any] | None = None,
        result: "ChatResponse | AsyncIterable[ChatResponseUpdate] | None" = None,
        terminate: bool = False,
        kwargs: dict[str, Any] | None = None,
    ) -> None:

设计特点:

  • 可修改消息:MutableSequence 允许中间件修改消息列表(如添加系统提示)
  • Options 透传:LLM 特定参数(temperature, model_id 等)
  • 客户端解耦:通过 ChatClientProtocol 抽象不同 LLM 提供商

Context 对比表

特性AgentRunContextFunctionInvocationContextChatContext
核心对象agentfunctionchat_client
数据载体messagesarguments (BaseModel)messages (Mutable)
状态管理thread-options
典型用途业务编排参数验证/缓存模型调用/重试

Middleware 抽象层

1. 抽象基类设计(第 283-469 行)

class AgentMiddleware(ABC):
    @abstractmethod
    async def process(
        self,
        context: AgentRunContext,
        next: Callable[[AgentRunContext], Awaitable[None]],
    ) -> None:
        """
        关键设计:不返回值,所有数据通过 context 流转
        - 设置 context.result 可覆盖执行结果
        - 调用 next(context) 继续执行链
        """

统一签名模式:

async def process(self, context: TContext, next: NextFn) -> None
# 其中 NextFn = Callable[[TContext], Awaitable[None]]

这种设计的精妙之处:

  1. 无返回值:强制所有副作用通过 Context 传递
  2. 洋葱模型:await next(context) 将控制权交给下一个中间件
  3. 双向拦截:可在 next() 前后分别处理(前置/后置逻辑)

2. 纯函数中间件支持(第 472-609 行)

# 类型别名定义简洁的函数签名
AgentMiddlewareCallable = Callable[
    [AgentRunContext, Callable[[AgentRunContext], Awaitable[None]]], 
    Awaitable[None]
]

class MiddlewareWrapper(Generic[TContext]):
    """将纯函数包装为类协议"""
    def __init__(self, func: Callable[..., Awaitable[None]]) -> None:
        self.func = func
    
    async def process(self, context: T, next: NextFn) -> None:
        await self.func(context, next)

为什么要支持纯函数?

# 类方式(有状态)
class LoggingMiddleware(AgentMiddleware):
    def __init__(self, logger):
        self.logger = logger
    
    async def process(self, context, next):
        self.logger.info(f"Agent: {context.agent.name}")
        await next(context)

# 函数方式(无状态/简洁)
@agent_middleware
async def logging_middleware(context: AgentRunContext, next):
    print(f"Before: {context.agent.name}")
    await next(context)
    print(f"After: {context.result}")

3. 装饰器系统(第 495-591 行)

def agent_middleware(func: AgentMiddlewareCallable) -> AgentMiddlewareCallable:
    # 通过属性标记中间件类型,用于自动分类
    func._middleware_type: MiddlewareType = MiddlewareType.AGENT
    return func

def function_middleware(func: FunctionMiddlewareCallable) -> FunctionMiddlewareCallable:
    func._middleware_type: MiddlewareType = MiddlewareType.FUNCTION
    return func

def chat_middleware(func: ChatMiddlewareCallable) -> ChatMiddlewareCallable:
    func._middleware_type: MiddlewareType = MiddlewareType.CHAT
    return func

Pipeline 执行机制

1. 链式构建核心算法(第 659-745 行)

def _create_handler_chain(
    self,
    final_handler: Callable[[Any], Awaitable[Any]],
    result_container: dict[str, Any],
    result_key: str = "result",
) -> Callable[[Any], Awaitable[None]]:
    """
    递归构建洋葱式调用链
    
    核心思想:
    - 从最后一个中间件向前构建
    - 每个中间件包装下一个处理函数
    - 最终包装实际业务逻辑
    """
    def create_next_handler(index: int) -> Callable[[Any], Awaitable[None]]:
        if index >= len(self._middleware):
            # 链尾:实际执行逻辑
            async def final_wrapper(c: Any) -> None:
                result = await final_handler(c)
                result_container[result_key] = result
                c.result = result  # 结果回写 Context
            return final_wrapper
        
        # 递归构建:当前中间件包装下一个 handler
        middleware = self._middleware[index]
        next_handler = create_next_handler(index + 1)  # 递归!
        
        async def current_handler(c: Any) -> None:
            await middleware.process(c, next_handler)
        
        return current_handler
    
    return create_next_handler(0)  # 从第一个中间件开始

可视化执行流程:

┌─────────────────────────────────────────────────────────────────────────┐
│                    Handler Chain Construction                           │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   构建阶段(递归,从内到外):                                             │
│                                                                          │
│   create_handler(0)                                                      │
│   └── Middleware#1.process(ctx, create_handler(1))                       │
│       └── Middleware#2.process(ctx, create_handler(2))                   │
│           └── Middleware#3.process(ctx, create_handler(3))               │
│               └── final_wrapper(ctx)  ← 实际执行                        │
│                                                                          │
│   执行阶段(调用,从外到内):                                             │
│                                                                          │
│   ┌─────────────────────────────────────────────────────────────────┐   │
│   │ Middleware#1: before logic                                      │   │
│   │   ┌─────────────────────────────────────────────────────────┐   │   │
│   │   │ Middleware#2: before logic                              │   │   │
│   │   │   ┌─────────────────────────────────────────────────┐   │   │   │
│   │   │   │ Middleware#3: before logic                      │   │   │   │
│   │   │   │   ┌─────────────────────────────────────────┐   │   │   │   │
│   │   │   │   │  Final Handler: 执行业务逻辑              │   │   │   │   │
│   │   │   │   │  result = await actual_execution(ctx)   │   │   │   │   │
│   │   │   │   └─────────────────────────────────────────┘   │   │   │   │
│   │   │   │ Middleware#3: after logic                       │   │   │   │
│   │   │   └─────────────────────────────────────────────────┘   │   │   │
│   │   │ Middleware#2: after logic                               │   │   │
│   │   └─────────────────────────────────────────────────────────┘   │   │
│   │ Middleware#1: after logic                                       │   │
│   └─────────────────────────────────────────────────────────────────┘   │
│                                                                          │
│   ◄── 时间流向                                                           │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

2. Streaming 特殊处理(第 697-745 行)

def _create_streaming_handler_chain(
    self,
    final_handler: Callable[[Any], Any],  # 返回 AsyncIterable
    result_container: dict[str, Any],
    result_key: str = "result_stream",
) -> Callable[[Any], Awaitable[None]]:
    
    def create_next_handler(index: int) -> Callable[[Any], Awaitable[None]]:
        if index >= len(self._middleware):
            async def final_wrapper(c: Any) -> None:
                if c.terminate:
                    return  # 提前终止
                
                # 处理非协程情况(如生成器函数)
                try:
                    result = await final_handler(c)
                except TypeError:
                    result = final_handler(c)  # 同步生成器
                
                result_container[result_key] = result
                c.result = result
            return final_wrapper
        
        middleware = self._middleware[index]
        next_handler = create_next_handler(index + 1)
        
        async def current_handler(c: Any) -> None:
            await middleware.process(c, next_handler)
            if c.terminate:  # 终止检查
                return
        
        return current_handler

流式 vs 非流式差异:

场景返回值类型终止检查位置特殊处理
非流式AgentResponseagent_final_handler直接返回
流式AsyncIterable[Update]每个中间件后try/except TypeError

3. Pipeline 执行入口

# Agent 非流式执行(第 776-826 行)
async def execute(
    self,
    agent: "AgentProtocol",
    messages: list[ChatMessage],
    context: AgentRunContext,
    final_handler: Callable[[AgentRunContext], Awaitable[AgentResponse]],
) -> AgentResponse | None:
    # 1. 更新 Context
    context.agent = agent
    context.messages = messages
    context.is_streaming = False
    
    # 2. 短路:无中间件时直接执行
    if not self._middleware:
        return await final_handler(context)
    
    # 3. 构建并执行链
    result_container = {"result": None}
    first_handler = self._create_handler_chain(agent_final_handler, result_container, "result")
    await first_handler(context)
    
    # 4. 结果处理(支持 override)
    if context.result is not None:
        return context.result  # 中间件覆盖的结果
    return result_container.get("result") or AgentResponse()

类装饰器实现

1. use_agent_middleware 装饰器(第 1153-1314 行)

这是系统最核心的装饰器,为 Agent 类注入中间件能力:

def use_agent_middleware(agent_class: type[TAgent]) -> type[TAgent]:
    # 1. 保存原始方法
    original_run = agent_class.run
    original_run_stream = agent_class.run_stream
    
    def _build_middleware_pipelines(
        agent_level_middlewares: Sequence[Middleware] | None,
        run_level_middlewares: Sequence[Middleware] | None = None,
    ) -> tuple[AgentMiddlewarePipeline, FunctionMiddlewarePipeline, list[ChatMiddleware]]:
        """
        双层中间件架构:
        - Agent 级:实例化时注册,全局生效
        - Run 级:每次调用时传入,仅本次生效
        """
        middleware = categorize_middleware(
            *(agent_level_middlewares or ()), 
            *(run_level_middlewares or ())
        )
        return (
            AgentMiddlewarePipeline(middleware["agent"]),
            FunctionMiddlewarePipeline(middleware["function"]),
            middleware["chat"],
        )
    
    async def middleware_enabled_run(self, messages, *, thread=None, middleware=None, **kwargs):
        """被装饰后的 run 方法"""
        # 获取实例级中间件
        agent_middleware = getattr(self, "middleware", None)
        
        # 构建管道(合并实例级 + 调用级)
        agent_pipeline, function_pipeline, chat_middlewares = _build_middleware_pipelines(
            agent_middleware, middleware
        )
        
        # 传递 Function 中间件到下层(通过 kwargs 隐式传递)
        if function_pipeline.has_middlewares:
            kwargs["_function_middleware_pipeline"] = function_pipeline
        
        # 传递 Chat 中间件到下层
        if chat_middlewares:
            kwargs["middleware"] = chat_middlewares
        
        # 标准化消息格式
        normalized_messages = normalize_messages(messages)
        
        # 执行中间件管道或短路
        if agent_pipeline.has_middlewares:
            context = AgentRunContext(
                agent=self,
                messages=normalized_messages,
                thread=thread,
                is_streaming=False,
                kwargs=kwargs,
            )
            
            async def _execute_handler(ctx):
                return await original_run(self, ctx.messages, thread=thread, **ctx.kwargs)
            
            return await agent_pipeline.execute(self, normalized_messages, context, _execute_handler)
        
        # 无中间件:直接调用原始方法
        return await original_run(self, normalized_messages, thread=thread, **kwargs)

关键机制:

┌─────────────────────────────────────────────────────────────────────┐
│              Middleware Propagation Chain                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│   Agent.run(messages, middleware=[...])                              │
│        │                                                             │
│        ▼                                                             │
│   ┌──────────────────────────────────────────┐                      │
│   │ use_agent_middleware 装饰器              │                      │
│   │ 1. 分类中间件                             │                      │
│   │    - AgentMiddleware → AgentPipeline     │                      │
│   │    - FunctionMiddleware ─┐               │                      │
│   │    - ChatMiddleware ─────┼──┐            │                      │
│   └──────────────────────────┼───┼────────────┘                      │
│                              │   │                                   │
│                              │   │ 通过 kwargs 传递                   │
│                              ▼   ▼                                   │
│   ┌──────────────────────────────────────────┐                      │
│   │ ChatClient.get_response()                │                      │
│   │ 接收 kwargs["middleware"]               │                      │
│   │ 接收 kwargs["_function_middleware_pipeline"]                     │
│   └──────────────────────────────────────────┘                      │
│                              │                                       │
│                              ▼                                       │
│   ┌──────────────────────────────────────────┐                      │
│   │ use_chat_middleware 装饰器               │                      │
│   │ 进一步分类并构建各自 Pipeline            │                      │
│   └──────────────────────────────────────────┘                      │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

2. 中间件分类器(第 1491-1534 行)

def categorize_middleware(
    *middleware_sources: Middleware | None,
) -> MiddlewareDict:
    """
    智能分类逻辑:
    1. 类实例:通过 isinstance 判断
    2. 函数:通过 _middleware_type 标记或类型注解推断
    """
    result: MiddlewareDict = {"agent": [], "function": [], "chat": []}
    
    for middleware in all_middleware:
        if isinstance(middleware, AgentMiddleware):
            result["agent"].append(middleware)
        elif isinstance(middleware, FunctionMiddleware):
            result["function"].append(middleware)
        elif isinstance(middleware, ChatMiddleware):
            result["chat"].append(middleware)
        elif callable(middleware):
            # 函数类型:标记 > 注解 > 异常
            middleware_type = _determine_middleware_type(middleware)
            # ... 按类型分发

3. 类型推断算法(第 1083-1149 行)

def _determine_middleware_type(middleware: Any) -> MiddlewareType:
    """
    双重验证机制:
    1. 装饰器标记(显式)
    2. 参数类型注解(隐式)
    """
    decorator_type = getattr(middleware, "_middleware_type", None)
    
    # 通过签名推断
    sig = inspect.signature(middleware)
    params = list(sig.parameters.values())
    first_param = params[0]
    
    if first_param.annotation.__name__ == "AgentRunContext":
        param_type = MiddlewareType.AGENT
    # ... 其他类型判断
    
    # 双重验证:装饰器和注解必须一致
    if decorator_type and param_type:
        if decorator_type != param_type:
            raise MiddlewareException("Type mismatch...")
        return decorator_type
    
    return decorator_type or param_type  # 优先级:显式 > 隐式

终止机制

context.terminate 的使用场景

class ValidationMiddleware(FunctionMiddleware):
    async def process(self, context: FunctionInvocationContext, next):
        # 参数验证失败,直接返回错误,不执行实际函数
        if not self.validate(context.arguments):
            context.result = {"error": "Validation failed"}
            context.terminate = True  # 终止后续执行
            return  # 不调用 next()
        
        await next(context)

终止检查点:

┌─────────────────────────────────────────────────────────────────┐
│                    Termination Checkpoints                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   非流式执行流程:                                                │
│   ─────────────                                                  │
│   Middleware#1.process()                                         │
│        │                                                         │
│        ├── [可设置 terminate=True]                                │
│        ▼                                                         │
│   Middleware#2.process()                                         │
│        │                                                         │
│        ▼                                                         │
│   agent_final_handler()  ← 检查 terminate                         │
│        │  if c.terminate: return c.result or empty               │
│        ▼                                                         │
│   实际业务执行                                                   │
│                                                                  │
│   流式执行流程:                                                  │
│   ────────────                                                   │
│   Middleware#1.process()                                         │
│        │                                                         │
│        ├── [设置 terminate=True]                                  │
│        ├── await next(context)                                   │
│        │                                                         │
│        ▼  ← 每个中间件后检查                                      │
│   if c.terminate: return  # 立即退出                             │
│                                                                  │
│   【关键差异】                                                    │
│   • 非流式:仅在 final_handler 前检查一次                         │
│   • 流式:每个中间件执行后都检查,支持快速短路                    │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

深挖价值点

1. 设计亮点

A. 递归链式构建

# 优雅的递归构建洋葱模型
next_handler = create_next_handler(index + 1)  # 递归构建内层
async def current_handler(c):
    await middleware.process(c, next_handler)  # 包装当前层

B. Context 作为数据总线

  • 所有数据(输入、输出、元数据)通过 Context 流转
  • 支持中间件覆盖结果(短路模式)
  • 支持后置观察(日志/监控)

C. 双层中间件架构

  • Agent 级:全局配置,实例复用
  • Run 级:动态注入,单次生效
  • 通过 categorize_middleware 智能合并

D. 流式原生支持

  • _create_streaming_handler_chain 专门处理异步迭代器
  • try/except TypeError 兼容同步生成器
  • 每个中间件后可终止,避免无效流处理

2. 是否值得深挖链式执行机制?

结论:非常值得,原因如下:

  1. 架构核心:链式执行是整个中间件系统的核心机制,理解它就能理解整个框架的数据流转

  2. 面试高频:这种洋葱模型/责任链模式是系统设计面试的经典题目

  3. 性能敏感点:

    • 递归构建链的复杂度 O(n)
    • 每个请求都重新构建(是否有优化空间?)
    • 流式场景的终止检查开销
  4. 可扩展性:

    • 如何支持条件中间件?(特定路由才启用)
    • 如何支持并行中间件?(无依赖的可并发执行)
    • 如何支持错误恢复?(当前无 try/catch 包装)

建议深挖方向:

┌─────────────────────────────────────────────────────────────────┐
│              Potential Deep Dive Areas                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  1. 性能优化:                                                    │
│     • 中间件链缓存(相同配置复用 Handler Chain)                  │
│     • 延迟构建(确定有中间件时才构建)                            │
│                                                                  │
│  2. 错误处理:                                                    │
│     • 当前链中任一中间件异常会导致整个链失败                      │
│     • 是否需要 try/catch 包装?                                   │
│                                                                  │
│  3. 条件执行:                                                    │
│     • 基于 Context 的条件判断(如:仅对特定 Agent 启用)          │
│                                                                  │
│  4. 并行中间件:                                                  │
│     • 无依赖的中间件并行执行(如:日志 + 统计)                   │
│                                                                  │
│  5. 类型安全:                                                    │
│     • 当前大量使用 Any,是否可用泛型加强约束                      │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

3. 源码精华片段

链式构建的递归之美(第 676-695 行):

def create_next_handler(index: int) -> Callable[[Any], Awaitable[None]]:
    if index >= len(self._middleware):
        # 基准情况:最终处理器
        async def final_wrapper(c: Any) -> None:
            result = await final_handler(c)
            result_container[result_key] = result
            c.result = result
        return final_wrapper
    
    # 递归步骤:当前中间件包装下一个
    middleware = self._middleware[index]
    next_handler = create_next_handler(index + 1)  # 递归调用
    
    async def current_handler(c: Any) -> None:
        await middleware.process(c, next_handler)
    
    return current_handler

return create_next_handler(0)  # 从链头开始

双层中间件合并逻辑(第 1193-1209 行):

def _build_middleware_pipelines(
    agent_level_middlewares: Sequence[Middleware] | None,
    run_level_middlewares: Sequence[Middleware] | None = None,
) -> tuple[...]:
    """
    设计巧思:使用 None 作为哨兵值,避免空列表的歧义
    *(agent_level_middlewares or ())  # 解包语法确保参数展开正确
    """
    middleware = categorize_middleware(
        *(agent_level_middlewares or ()),
        *(run_level_middlewares or ())
    )
    # ...