Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • 核心 Agent 架构分析

核心 Agent 架构分析

概述

Microsoft Agent Framework 的 Agent 架构采用协议驱动 + 分层设计的模式,通过三层核心抽象(AgentProtocol、BaseAgent、ChatAgent)构建了一个灵活、可扩展的 AI Agent 系统。

架构核心设计理念:

  • 协议优先:使用 Python Protocol 实现结构化子类型,支持鸭子类型
  • 分层职责:从接口协议 → 基础能力 → 主要实现,逐层增强
  • 可组合性:Agent 可转为 Tool,支持 MCP 协议,实现 Agent 编排
  • 生命周期管理:完整的 async context manager 支持,确保资源正确释放

AgentProtocol:协议驱动设计

定义与核心作用

# _agents.py:154-270
@runtime_checkable
class AgentProtocol(Protocol):
    """A protocol for an agent that can be invoked.
    
    This protocol defines the interface that all agents must implement,
    including properties for identification and methods for execution.
    
    Note:
        Protocols use structural subtyping (duck typing). Classes don't need
        to explicitly inherit from this protocol to be considered compatible.
    """
    
    id: str
    name: str | None
    description: str | None
    
    async def run(self, messages, *, thread=None, **kwargs) -> AgentResponse: ...
    
    def run_stream(self, messages, *, thread=None, **kwargs) -> AsyncIterable[AgentResponseUpdate]: ...
    
    def get_new_thread(self, **kwargs) -> AgentThread: ...

为什么使用 Protocol 而非抽象基类?

特性ProtocolABC (抽象基类)
类型检查运行时 + 静态类型检查运行时 + 静态类型检查
继承要求无需显式继承必须显式继承
鸭子类型支持(结构化子类型)不支持(名义子类型)
运行时检查isinstance(obj, AgentProtocol) 可用isinstance() 可用
灵活性允许完全自定义实现强制继承框架类

核心优势:

  1. 零侵入集成:任何实现了 run()、run_stream()、get_new_thread() 以及三个属性的类自动成为 Agent,无需继承框架类
  2. 第三方友好:用户可以不导入任何框架类,完全自定义 Agent 实现
  3. 渐进式采用:可以从简单实现开始,逐步采用框架功能

代码示例:自定义 Agent 无需继承

# 来自 _agents.py:167-204 的示例
class CustomAgent:
    def __init__(self):
        self.id = "custom-agent-001"
        self.name = "Custom Agent"
        self.description = "A fully custom agent implementation"

    async def run(self, messages=None, *, thread=None, **kwargs):
        return AgentResponse(messages=[], response_id="custom-response")

    def run_stream(self, messages=None, *, thread=None, **kwargs):
        async def _stream():
            yield AgentResponseUpdate()
        return _stream()

    def get_new_thread(self, **kwargs):
        return {"id": "custom-thread", "messages": []}

# 自动满足协议 - 无需继承 AgentProtocol
instance = CustomAgent()
assert isinstance(instance, AgentProtocol)  # True!

BaseAgent:基础能力层

核心属性

# _agents.py:275-358
class BaseAgent(SerializationMixin):
    """Base class for all Agent Framework agents.
    
    This class provides core functionality for agent implementations, including
    context providers, middleware support, and thread management.
    """
    
    def __init__(
        self,
        *,
        id: str | None = None,
        name: str | None = None,
        description: str | None = None,
        context_provider: ContextProvider | None = None,
        middleware: Sequence[Middleware] | None = None,
        additional_properties: MutableMapping[str, Any] | None = None,
        **kwargs: Any,
    ) -> None:
属性类型作用
idstr唯一标识符,自动生成 UUID
namestr | NoneAgent 名称,用于消息归属标识
descriptionstr | None描述信息
context_providerContextProvider | None上下文提供者,用于动态注入上下文
middlewarelist[Middleware] | None中间件列表,用于拦截调用
additional_propertiesdict[str, Any]额外属性存储

as_tool() 方法:Agent → Tool 转换

# _agents.py:411-503
def as_tool(
    self,
    *,
    name: str | None = None,
    description: str | None = None,
    arg_name: str = "task",
    arg_description: str | None = None,
    stream_callback: Callable[[AgentResponseUpdate], None]
    | Callable[[AgentResponseUpdate], Awaitable[None]]
    | None = None,
) -> FunctionTool[BaseModel, str]:

实现原理:

  1. 协议验证(行 454-455):

    if not isinstance(self, AgentProtocol):
        raise TypeError(f"Agent {self.__class__.__name__} must implement AgentProtocol...")
    
  2. 动态 Pydantic 模型创建(行 463-466):

    field_info = Field(..., description=argument_description)
    model_name = f"{name or _sanitize_agent_name(self.name) or 'agent'}_task"
    input_model = create_model(model_name, **{arg_name: (str, field_info)})
    
    • 创建动态输入模型,支持 task 参数(可自定义)
    • 利用 Pydantic 的 create_model 实现运行时模型生成
  3. 回调类型检测(行 469):

    is_async_callback = stream_callback is not None and inspect.iscoroutinefunction(stream_callback)
    
    • 预先检测回调是否为 async,避免运行时重复判断
  4. Wrapper 函数(行 471-493):

    async def agent_wrapper(**kwargs: Any) -> str:
        input_text = kwargs.get(arg_name, "")
        forwarded_kwargs = {k: v for k, v in kwargs.items() if k not in (arg_name, "conversation_id")}
        
        if stream_callback is None:
            return (await self.run(input_text, **forwarded_kwargs)).text
        
        # Streaming mode
        response_updates: list[AgentResponseUpdate] = []
        async for update in self.run_stream(input_text, **forwarded_kwargs):
            response_updates.append(update)
            if is_async_callback:
                await stream_callback(update)
            else:
                stream_callback(update)
        
        return AgentResponse.from_agent_run_response_updates(response_updates).text
    

设计亮点:

  • 支持同步/异步回调
  • 运行时上下文转发(_forward_runtime_kwargs = True)
  • 流式与非流式双模式
  • 自动消息累积与转换

Thread 管理机制

# _agents.py:384-409
def get_new_thread(self, **kwargs: Any) -> AgentThread:
    """Return a new AgentThread instance that is compatible with the agent."""
    return AgentThread(**kwargs, context_provider=self.context_provider)

async def deserialize_thread(self, serialized_thread: Any, **kwargs: Any) -> AgentThread:
    """Deserialize a thread from its serialized state."""
    thread: AgentThread = self.get_new_thread()
    await thread.update_from_thread_state(serialized_thread, **kwargs)
    return thread

async def _notify_thread_of_new_messages(
    self,
    thread: AgentThread,
    input_messages: ChatMessage | Sequence[ChatMessage],
    response_messages: ChatMessage | Sequence[ChatMessage],
    **kwargs: Any,
) -> None:
    """Notify the thread of new messages."""
    if isinstance(input_messages, ChatMessage) or len(input_messages) > 0:
        await thread.on_new_messages(input_messages)
    if isinstance(response_messages, ChatMessage) or len(response_messages) > 0:
        await thread.on_new_messages(response_messages)
    if thread.context_provider:
        await thread.context_provider.invoked(input_messages, response_messages, **kwargs)

Thread 管理职责:

  • 创建与 Agent 关联的新 Thread
  • 支持 Thread 状态的序列化/反序列化
  • 消息变更通知机制
  • 与 ContextProvider 的联动

ChatAgent:主要实现层

类定义与泛型设计

# _agents.py:509-511
@use_agent_middleware
@use_agent_instrumentation(capture_usage=False)
class ChatAgent(BaseAgent, Generic[TOptions_co]):
    """A Chat Client Agent.
    
    This is the primary agent implementation that uses a chat client to interact
    with language models. It supports tools, context providers, middleware, and
    both streaming and non-streaming responses.
    """

装饰器:

  • @use_agent_middleware:自动应用中间件管道
  • @use_agent_instrumentation:自动添加可观测性埋点

泛型参数 TOptions_co:

  • 使用 TypedDict 约束
  • 协变(covariant)设计,支持更具体的选项类型
  • 默认值为 "ChatOptions"
  • 启用 IDE 自动补全和类型检查

与 ChatClientProtocol 的协作

# _agents.py:587-589
def __init__(
    self,
    chat_client: ChatClientProtocol[TOptions_co],
    ...
) -> None:

ChatAgent 通过 ChatClientProtocol 与底层 LLM 服务通信:

ChatAgent → ChatClientProtocol → Concrete Client (OpenAI/Azure/etc.)

ChatClientProtocol 核心接口(_clients.py:86-178):

@runtime_checkable
class ChatClientProtocol(Protocol[TOptions_contra]):
    additional_properties: dict[str, Any]
    
    async def get_response(self, messages, *, options=None, **kwargs) -> ChatResponse: ...
    
    def get_streaming_response(self, messages, *, options=None, **kwargs) -> AsyncIterable[ChatResponseUpdate]: ...

Options 合并策略(_merge_options)

# _agents.py:78-108
def _merge_options(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
    """Merge two options dicts, with override values taking precedence."""
    result = dict(base)
    for key, value in override.items():
        if value is None:
            continue
        if key == "tools" and result.get("tools"):
            # Combine tool lists, avoiding duplicates by name
            existing_names = {getattr(t, "name", None) for t in result["tools"]}
            unique_new = [t for t in value if getattr(t, "name", None) not in existing_names]
            result["tools"] = list(result["tools"]) + unique_new
        elif key == "logit_bias" and result.get("logit_bias"):
            # Merge logit_bias dicts
            result["logit_bias"] = {**result["logit_bias"], **value}
        elif key == "metadata" and result.get("metadata"):
            # Merge metadata dicts
            result["metadata"] = {**result["metadata"], **value}
        elif key == "instructions" and result.get("instructions"):
            # Concatenate instructions
            result["instructions"] = f"{result['instructions']}\n{value}"
        else:
            result[key] = value
    return result

合并规则(优先级从高到低):

选项类型合并策略说明
tools按名称去重合并避免重复工具,运行时工具优先级高
logit_biasDict 合并运行时覆盖默认值
metadataDict 合并运行时覆盖默认值
instructions字符串拼接用换行符连接,形成完整指令
其他直接覆盖运行时值替换默认值

应用场景:

  • Agent 级别 default_options 与 Run 级别 options 合并
  • ContextProvider 提供的上下文与运行时选项合并
  • 确保配置层级清晰:默认值 → Agent配置 → 运行时参数

Tools 处理(包括 MCP Tools)

# _agents.py:676-679
# We ignore the MCP Servers here and store them separately,
# we add their functions to the tools list at runtime
normalized_tools: list[ToolProtocol | Callable[..., Any] | MutableMapping[str, Any]] = (
    [] if tools_ is None else tools_ if isinstance(tools_, list) else [tools_]
)
self.mcp_tools: list[MCPTool] = [tool for tool in normalized_tools if isinstance(tool, MCPTool)]
agent_tools = [tool for tool in normalized_tools if not isinstance(tool, MCPTool)]

工具分类处理:

  1. 普通工具(FunctionTool、HostedTool 等):直接加入 default_options["tools"]
  2. MCP 工具:单独存储到 self.mcp_tools,运行时动态展开

运行时 MCP 工具展开(_agents.py:846-859):

# Resolve final tool list (runtime provided tools + local MCP server tools)
final_tools: list[ToolProtocol | Callable[..., Any] | dict[str, Any]] = []
for tool in normalized_tools:
    if isinstance(tool, MCPTool):
        if not tool.is_connected:
            await self._async_exit_stack.enter_async_context(tool)
        final_tools.extend(tool.functions)
    else:
        final_tools.append(tool)

for mcp_server in self.mcp_tools:
    if not mcp_server.is_connected:
        await self._async_exit_stack.enter_async_context(mcp_server)
    final_tools.extend(mcp_server.functions)

关键设计:

  • MCP Tool 是惰性连接的(按需连接)
  • 使用 AsyncExitStack 管理 MCP 连接生命周期
  • MCP 工具展开为具体的 FunctionTool 列表

生命周期管理(async context manager)

# _agents.py:709-741
async def __aenter__(self) -> "Self":
    """Enter the async context manager.
    
    If any of the chat_client or local_mcp_tools are context managers,
    they will be entered into the async exit stack to ensure proper cleanup.
    """
    for context_manager in chain([self.chat_client], self.mcp_tools):
        if isinstance(context_manager, AbstractAsyncContextManager):
            await self._async_exit_stack.enter_async_context(context_manager)
    return self

async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: Any,
) -> None:
    """Exit the async context manager.
    
    Close the async exit stack to ensure all context managers are exited properly.
    """
    await self._async_exit_stack.aclose()

生命周期管理对象:

  • chat_client:如果实现了 async context manager
  • mcp_tools:每个 MCP 工具连接

使用模式:

async with agent:
    # MCP 连接已建立
    response = await agent.run("Hello")
# 自动清理所有连接

as_mcp_server() 方法:MCP 服务器创建

# _agents.py:1105-1212
def as_mcp_server(
    self,
    *,
    server_name: str = "Agent",
    version: str | None = None,
    instructions: str | None = None,
    lifespan: Callable[["Server[Any]"], AbstractAsyncContextManager[Any]] | None = None,
    **kwargs: Any,
) -> "Server[Any]":
    """Create an MCP server from an agent instance.
    
    This function automatically creates a MCP server from an agent instance...
    """

实现架构:

Agent → as_mcp_server() → MCP Server (with 3 handlers)
├── @server.list_tools() → _list_tools()
├── @server.call_tool() → _call_tool()  
└── @server.set_logging_level() → _set_logging_level()

核心机制:

  1. 工具列表(行 1153-1170):将 Agent 包装为单个 MCP Tool
  2. 工具调用(行 1172-1203):转发到 agent_tool.invoke()
  3. 日志集成(行 1143-1151):双向日志(本地 + MCP 会话)

执行流程详解

run() 方法完整执行流程

run_stream() 方法完整执行流程


架构类图


关键设计决策

1. 协议驱动架构

决策:使用 Python Protocol 而非抽象基类定义核心接口。

理由:

  • 支持鸭子类型,降低框架侵入性
  • 第三方可以不依赖框架类实现 Agent
  • runtime_checkable 提供运行时类型检查能力

权衡:

  • 优点:灵活性高、集成友好
  • 缺点:无法强制调用 super().__init__(),依赖文档约定

2. 分层职责设计

层级职责扩展方式
AgentProtocol定义接口契约实现协议方法
BaseAgent提供通用能力(Thread、序列化、as_tool)继承并添加属性
ChatAgent实现 LLM 交互逻辑继承并覆盖方法

价值:每层都有明确的单一职责,便于维护和测试。

3. Options 合并策略

关键决策:

  • Tools 按名称去重合并(而非简单替换)
  • Instructions 字符串拼接(形成完整指令)
  • logit_bias 和 metadata Dict 合并

设计意图:

  • 支持 ContextProvider 动态注入工具/指令
  • 避免运行时参数意外覆盖重要配置
  • 保留所有层级的配置信息

4. MCP 工具惰性展开

决策:MCP 工具在 run()/run_stream() 时才连接并展开。

理由:

  • 避免初始化时建立不必要的连接
  • 支持 AsyncExitStack 自动管理生命周期
  • 运行时展开可以获取最新的工具列表

5. 装饰器式横切关注点

@use_agent_middleware
@use_agent_instrumentation(capture_usage=False)
class ChatAgent(BaseAgent, Generic[TOptions_co]):

优势:

  • 中间件和可观测性与业务逻辑解耦
  • 通过装饰器链灵活组合功能
  • 实现 AOP 风格的横切关注点处理

6. Agent 即 Tool 哲学

核心设计:任何 Agent 都可以通过 as_tool() 转为 Tool。

意义:

  • 实现 Agent 编排(Agent 调用 Agent)
  • 统一 Tool 和 Agent 的抽象
  • 支持将复杂 Agent 作为单一步骤复用

深挖价值点

1. 协议设计的边界与权衡 🔍

待研究问题:

  • Protocol 如何平衡灵活性与类型安全?
  • @runtime_checkable 的性能开销如何?
  • 协议版本演进的最佳实践是什么?

代码位置:_agents.py:154-204

2. MCP 集成的架构深度 🔍

待研究问题:

  • MCP 连接池管理策略
  • MCP 工具缓存机制
  • 多 MCP Server 协调问题
  • 错误恢复和重连策略

代码位置:

  • _mcp.py - MCP 工具完整实现
  • _agents.py:846-859 - MCP 展开逻辑
  • _agents.py:1105-1212 - MCP Server 创建

3. Middleware 管道实现机制 🔍

待研究问题:

  • @use_agent_middleware 如何实现方法拦截?
  • Middleware 执行顺序如何控制?
  • 如何在 Middleware 中修改/替换执行结果?

代码位置:_middleware.py:300+(需继续阅读)

4. Thread 状态一致性保证 🔍

待研究问题:

  • 服务托管 Thread 与本地 Thread 的状态同步
  • 并发场景下的 Thread 安全
  • 序列化/反序列化的一致性保证

代码位置:_threads.py:293-506

5. 泛型 Options 的类型流 🔍

待研究问题:

  • ChatAgent[OpenAIOptions] 如何传递类型信息?
  • response_format 的类型推断机制
  • TypedDict 与 Pydantic 的互操作

代码位置:

  • _agents.py:63-76 - 泛型类型变量定义
  • _agents.py:587-589 - 构造函数泛型参数
  • _types.py - ChatOptions 定义

6. 可观测性埋点实现 🔍

待研究问题:

  • @use_agent_instrumentation 如何实现?
  • OpenTelemetry 集成机制
  • 性能指标的采集策略

代码位置:observability.py(需探索)


值得深挖的子课题

基于以上分析,以下子课题值得进一步深入研究:

  1. MCP 架构深度分析:理解 MCP 协议集成、连接管理、工具展开机制
  2. Middleware 管道架构:实现机制、上下文传递、拦截点设计
  3. Thread 状态管理:服务托管 vs 本地存储、序列化策略、一致性保证
  4. 泛型类型系统:TypeVar 协变/逆变、TypedDict 类型流、IDE 支持
  5. 可观测性架构:OTel 集成、装饰器埋点、性能监控

主要设计亮点和独特之处

1. 协议优先的架构哲学

不同于传统框架的继承模式,Agent Framework 使用 Protocol 定义契约,实现了真正的鸭子类型。这种设计让用户可以:

  • 不依赖框架类编写 Agent
  • 渐进式采用框架功能
  • 轻松集成第三方 Agent 实现

2. Agent 即 Tool 的统一抽象

as_tool() 方法将 Agent 和 Tool 统一为同一抽象,实现了:

  • Agent 编排(Agent 调用 Agent)
  • 层级化任务分解
  • Tool 生态的递归组合

3. 灵活的 Options 合并策略

精心设计的 _merge_options 函数,通过差异化合并策略(去重、拼接、覆盖),支持复杂的多层级配置场景,特别适合:

  • 多 Agent 协作时的配置继承
  • ContextProvider 动态上下文注入
  • 运行时参数覆盖

4. MCP 协议原生支持

内置 MCP(Model Context Protocol)支持,让 Agent 可以:

  • 连接外部 MCP Server
  • 自身作为 MCP Server 提供服务
  • 实现跨平台、跨语言的 Agent 互操作

5. 完整的生命周期管理

AsyncExitStack 的使用确保了:

  • MCP 连接自动管理
  • ChatClient 资源释放
  • 异常安全清理

6. 装饰器式横切关注点

通过装饰器实现中间件和可观测性,保持了:

  • 业务逻辑纯净
  • 功能可组合
  • 扩展点清晰

7. 线程模型的双模支持

支持服务托管 Thread 和本地 Thread 两种模式:

  • 服务托管:LLM 服务管理对话历史
  • 本地托管:应用层管理对话历史(支持自定义存储)

这种设计让框架可以适配不同的部署架构和合规要求。