核心 Agent 架构分析
概述
Microsoft Agent Framework 的 Agent 架构采用协议驱动 + 分层设计的模式,通过三层核心抽象(AgentProtocol、BaseAgent、ChatAgent)构建了一个灵活、可扩展的 AI Agent 系统。
架构核心设计理念:
- 协议优先:使用 Python Protocol 实现结构化子类型,支持鸭子类型
- 分层职责:从接口协议 → 基础能力 → 主要实现,逐层增强
- 可组合性:Agent 可转为 Tool,支持 MCP 协议,实现 Agent 编排
- 生命周期管理:完整的 async context manager 支持,确保资源正确释放
AgentProtocol:协议驱动设计
定义与核心作用
# _agents.py:154-270
@runtime_checkable
class AgentProtocol(Protocol):
"""A protocol for an agent that can be invoked.
This protocol defines the interface that all agents must implement,
including properties for identification and methods for execution.
Note:
Protocols use structural subtyping (duck typing). Classes don't need
to explicitly inherit from this protocol to be considered compatible.
"""
id: str
name: str | None
description: str | None
async def run(self, messages, *, thread=None, **kwargs) -> AgentResponse: ...
def run_stream(self, messages, *, thread=None, **kwargs) -> AsyncIterable[AgentResponseUpdate]: ...
def get_new_thread(self, **kwargs) -> AgentThread: ...
为什么使用 Protocol 而非抽象基类?
| 特性 | Protocol | ABC (抽象基类) |
|---|---|---|
| 类型检查 | 运行时 + 静态类型检查 | 运行时 + 静态类型检查 |
| 继承要求 | 无需显式继承 | 必须显式继承 |
| 鸭子类型 | 支持(结构化子类型) | 不支持(名义子类型) |
| 运行时检查 | isinstance(obj, AgentProtocol) 可用 | isinstance() 可用 |
| 灵活性 | 允许完全自定义实现 | 强制继承框架类 |
核心优势:
- 零侵入集成:任何实现了
run()、run_stream()、get_new_thread()以及三个属性的类自动成为 Agent,无需继承框架类 - 第三方友好:用户可以不导入任何框架类,完全自定义 Agent 实现
- 渐进式采用:可以从简单实现开始,逐步采用框架功能
代码示例:自定义 Agent 无需继承
# 来自 _agents.py:167-204 的示例
class CustomAgent:
def __init__(self):
self.id = "custom-agent-001"
self.name = "Custom Agent"
self.description = "A fully custom agent implementation"
async def run(self, messages=None, *, thread=None, **kwargs):
return AgentResponse(messages=[], response_id="custom-response")
def run_stream(self, messages=None, *, thread=None, **kwargs):
async def _stream():
yield AgentResponseUpdate()
return _stream()
def get_new_thread(self, **kwargs):
return {"id": "custom-thread", "messages": []}
# 自动满足协议 - 无需继承 AgentProtocol
instance = CustomAgent()
assert isinstance(instance, AgentProtocol) # True!
BaseAgent:基础能力层
核心属性
# _agents.py:275-358
class BaseAgent(SerializationMixin):
"""Base class for all Agent Framework agents.
This class provides core functionality for agent implementations, including
context providers, middleware support, and thread management.
"""
def __init__(
self,
*,
id: str | None = None,
name: str | None = None,
description: str | None = None,
context_provider: ContextProvider | None = None,
middleware: Sequence[Middleware] | None = None,
additional_properties: MutableMapping[str, Any] | None = None,
**kwargs: Any,
) -> None:
| 属性 | 类型 | 作用 |
|---|---|---|
id | str | 唯一标识符,自动生成 UUID |
name | str | None | Agent 名称,用于消息归属标识 |
description | str | None | 描述信息 |
context_provider | ContextProvider | None | 上下文提供者,用于动态注入上下文 |
middleware | list[Middleware] | None | 中间件列表,用于拦截调用 |
additional_properties | dict[str, Any] | 额外属性存储 |
as_tool() 方法:Agent → Tool 转换
# _agents.py:411-503
def as_tool(
self,
*,
name: str | None = None,
description: str | None = None,
arg_name: str = "task",
arg_description: str | None = None,
stream_callback: Callable[[AgentResponseUpdate], None]
| Callable[[AgentResponseUpdate], Awaitable[None]]
| None = None,
) -> FunctionTool[BaseModel, str]:
实现原理:
协议验证(行 454-455):
if not isinstance(self, AgentProtocol): raise TypeError(f"Agent {self.__class__.__name__} must implement AgentProtocol...")动态 Pydantic 模型创建(行 463-466):
field_info = Field(..., description=argument_description) model_name = f"{name or _sanitize_agent_name(self.name) or 'agent'}_task" input_model = create_model(model_name, **{arg_name: (str, field_info)})- 创建动态输入模型,支持
task参数(可自定义) - 利用 Pydantic 的
create_model实现运行时模型生成
- 创建动态输入模型,支持
回调类型检测(行 469):
is_async_callback = stream_callback is not None and inspect.iscoroutinefunction(stream_callback)- 预先检测回调是否为 async,避免运行时重复判断
Wrapper 函数(行 471-493):
async def agent_wrapper(**kwargs: Any) -> str: input_text = kwargs.get(arg_name, "") forwarded_kwargs = {k: v for k, v in kwargs.items() if k not in (arg_name, "conversation_id")} if stream_callback is None: return (await self.run(input_text, **forwarded_kwargs)).text # Streaming mode response_updates: list[AgentResponseUpdate] = [] async for update in self.run_stream(input_text, **forwarded_kwargs): response_updates.append(update) if is_async_callback: await stream_callback(update) else: stream_callback(update) return AgentResponse.from_agent_run_response_updates(response_updates).text
设计亮点:
- 支持同步/异步回调
- 运行时上下文转发(
_forward_runtime_kwargs = True) - 流式与非流式双模式
- 自动消息累积与转换
Thread 管理机制
# _agents.py:384-409
def get_new_thread(self, **kwargs: Any) -> AgentThread:
"""Return a new AgentThread instance that is compatible with the agent."""
return AgentThread(**kwargs, context_provider=self.context_provider)
async def deserialize_thread(self, serialized_thread: Any, **kwargs: Any) -> AgentThread:
"""Deserialize a thread from its serialized state."""
thread: AgentThread = self.get_new_thread()
await thread.update_from_thread_state(serialized_thread, **kwargs)
return thread
async def _notify_thread_of_new_messages(
self,
thread: AgentThread,
input_messages: ChatMessage | Sequence[ChatMessage],
response_messages: ChatMessage | Sequence[ChatMessage],
**kwargs: Any,
) -> None:
"""Notify the thread of new messages."""
if isinstance(input_messages, ChatMessage) or len(input_messages) > 0:
await thread.on_new_messages(input_messages)
if isinstance(response_messages, ChatMessage) or len(response_messages) > 0:
await thread.on_new_messages(response_messages)
if thread.context_provider:
await thread.context_provider.invoked(input_messages, response_messages, **kwargs)
Thread 管理职责:
- 创建与 Agent 关联的新 Thread
- 支持 Thread 状态的序列化/反序列化
- 消息变更通知机制
- 与 ContextProvider 的联动
ChatAgent:主要实现层
类定义与泛型设计
# _agents.py:509-511
@use_agent_middleware
@use_agent_instrumentation(capture_usage=False)
class ChatAgent(BaseAgent, Generic[TOptions_co]):
"""A Chat Client Agent.
This is the primary agent implementation that uses a chat client to interact
with language models. It supports tools, context providers, middleware, and
both streaming and non-streaming responses.
"""
装饰器:
@use_agent_middleware:自动应用中间件管道@use_agent_instrumentation:自动添加可观测性埋点
泛型参数 TOptions_co:
- 使用
TypedDict约束 - 协变(covariant)设计,支持更具体的选项类型
- 默认值为
"ChatOptions" - 启用 IDE 自动补全和类型检查
与 ChatClientProtocol 的协作
# _agents.py:587-589
def __init__(
self,
chat_client: ChatClientProtocol[TOptions_co],
...
) -> None:
ChatAgent 通过 ChatClientProtocol 与底层 LLM 服务通信:
ChatAgent → ChatClientProtocol → Concrete Client (OpenAI/Azure/etc.)
ChatClientProtocol 核心接口(_clients.py:86-178):
@runtime_checkable
class ChatClientProtocol(Protocol[TOptions_contra]):
additional_properties: dict[str, Any]
async def get_response(self, messages, *, options=None, **kwargs) -> ChatResponse: ...
def get_streaming_response(self, messages, *, options=None, **kwargs) -> AsyncIterable[ChatResponseUpdate]: ...
Options 合并策略(_merge_options)
# _agents.py:78-108
def _merge_options(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
"""Merge two options dicts, with override values taking precedence."""
result = dict(base)
for key, value in override.items():
if value is None:
continue
if key == "tools" and result.get("tools"):
# Combine tool lists, avoiding duplicates by name
existing_names = {getattr(t, "name", None) for t in result["tools"]}
unique_new = [t for t in value if getattr(t, "name", None) not in existing_names]
result["tools"] = list(result["tools"]) + unique_new
elif key == "logit_bias" and result.get("logit_bias"):
# Merge logit_bias dicts
result["logit_bias"] = {**result["logit_bias"], **value}
elif key == "metadata" and result.get("metadata"):
# Merge metadata dicts
result["metadata"] = {**result["metadata"], **value}
elif key == "instructions" and result.get("instructions"):
# Concatenate instructions
result["instructions"] = f"{result['instructions']}\n{value}"
else:
result[key] = value
return result
合并规则(优先级从高到低):
| 选项类型 | 合并策略 | 说明 |
|---|---|---|
tools | 按名称去重合并 | 避免重复工具,运行时工具优先级高 |
logit_bias | Dict 合并 | 运行时覆盖默认值 |
metadata | Dict 合并 | 运行时覆盖默认值 |
instructions | 字符串拼接 | 用换行符连接,形成完整指令 |
| 其他 | 直接覆盖 | 运行时值替换默认值 |
应用场景:
- Agent 级别
default_options与 Run 级别options合并 - ContextProvider 提供的上下文与运行时选项合并
- 确保配置层级清晰:默认值 → Agent配置 → 运行时参数
Tools 处理(包括 MCP Tools)
# _agents.py:676-679
# We ignore the MCP Servers here and store them separately,
# we add their functions to the tools list at runtime
normalized_tools: list[ToolProtocol | Callable[..., Any] | MutableMapping[str, Any]] = (
[] if tools_ is None else tools_ if isinstance(tools_, list) else [tools_]
)
self.mcp_tools: list[MCPTool] = [tool for tool in normalized_tools if isinstance(tool, MCPTool)]
agent_tools = [tool for tool in normalized_tools if not isinstance(tool, MCPTool)]
工具分类处理:
- 普通工具(FunctionTool、HostedTool 等):直接加入
default_options["tools"] - MCP 工具:单独存储到
self.mcp_tools,运行时动态展开
运行时 MCP 工具展开(_agents.py:846-859):
# Resolve final tool list (runtime provided tools + local MCP server tools)
final_tools: list[ToolProtocol | Callable[..., Any] | dict[str, Any]] = []
for tool in normalized_tools:
if isinstance(tool, MCPTool):
if not tool.is_connected:
await self._async_exit_stack.enter_async_context(tool)
final_tools.extend(tool.functions)
else:
final_tools.append(tool)
for mcp_server in self.mcp_tools:
if not mcp_server.is_connected:
await self._async_exit_stack.enter_async_context(mcp_server)
final_tools.extend(mcp_server.functions)
关键设计:
- MCP Tool 是惰性连接的(按需连接)
- 使用
AsyncExitStack管理 MCP 连接生命周期 - MCP 工具展开为具体的 FunctionTool 列表
生命周期管理(async context manager)
# _agents.py:709-741
async def __aenter__(self) -> "Self":
"""Enter the async context manager.
If any of the chat_client or local_mcp_tools are context managers,
they will be entered into the async exit stack to ensure proper cleanup.
"""
for context_manager in chain([self.chat_client], self.mcp_tools):
if isinstance(context_manager, AbstractAsyncContextManager):
await self._async_exit_stack.enter_async_context(context_manager)
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
"""Exit the async context manager.
Close the async exit stack to ensure all context managers are exited properly.
"""
await self._async_exit_stack.aclose()
生命周期管理对象:
chat_client:如果实现了 async context managermcp_tools:每个 MCP 工具连接
使用模式:
async with agent:
# MCP 连接已建立
response = await agent.run("Hello")
# 自动清理所有连接
as_mcp_server() 方法:MCP 服务器创建
# _agents.py:1105-1212
def as_mcp_server(
self,
*,
server_name: str = "Agent",
version: str | None = None,
instructions: str | None = None,
lifespan: Callable[["Server[Any]"], AbstractAsyncContextManager[Any]] | None = None,
**kwargs: Any,
) -> "Server[Any]":
"""Create an MCP server from an agent instance.
This function automatically creates a MCP server from an agent instance...
"""
实现架构:
Agent → as_mcp_server() → MCP Server (with 3 handlers)
├── @server.list_tools() → _list_tools()
├── @server.call_tool() → _call_tool()
└── @server.set_logging_level() → _set_logging_level()
核心机制:
- 工具列表(行 1153-1170):将 Agent 包装为单个 MCP Tool
- 工具调用(行 1172-1203):转发到
agent_tool.invoke() - 日志集成(行 1143-1151):双向日志(本地 + MCP 会话)
执行流程详解
run() 方法完整执行流程
run_stream() 方法完整执行流程
架构类图
关键设计决策
1. 协议驱动架构
决策:使用 Python Protocol 而非抽象基类定义核心接口。
理由:
- 支持鸭子类型,降低框架侵入性
- 第三方可以不依赖框架类实现 Agent
runtime_checkable提供运行时类型检查能力
权衡:
- 优点:灵活性高、集成友好
- 缺点:无法强制调用
super().__init__(),依赖文档约定
2. 分层职责设计
| 层级 | 职责 | 扩展方式 |
|---|---|---|
| AgentProtocol | 定义接口契约 | 实现协议方法 |
| BaseAgent | 提供通用能力(Thread、序列化、as_tool) | 继承并添加属性 |
| ChatAgent | 实现 LLM 交互逻辑 | 继承并覆盖方法 |
价值:每层都有明确的单一职责,便于维护和测试。
3. Options 合并策略
关键决策:
- Tools 按名称去重合并(而非简单替换)
- Instructions 字符串拼接(形成完整指令)
- logit_bias 和 metadata Dict 合并
设计意图:
- 支持 ContextProvider 动态注入工具/指令
- 避免运行时参数意外覆盖重要配置
- 保留所有层级的配置信息
4. MCP 工具惰性展开
决策:MCP 工具在 run()/run_stream() 时才连接并展开。
理由:
- 避免初始化时建立不必要的连接
- 支持
AsyncExitStack自动管理生命周期 - 运行时展开可以获取最新的工具列表
5. 装饰器式横切关注点
@use_agent_middleware
@use_agent_instrumentation(capture_usage=False)
class ChatAgent(BaseAgent, Generic[TOptions_co]):
优势:
- 中间件和可观测性与业务逻辑解耦
- 通过装饰器链灵活组合功能
- 实现 AOP 风格的横切关注点处理
6. Agent 即 Tool 哲学
核心设计:任何 Agent 都可以通过 as_tool() 转为 Tool。
意义:
- 实现 Agent 编排(Agent 调用 Agent)
- 统一 Tool 和 Agent 的抽象
- 支持将复杂 Agent 作为单一步骤复用
深挖价值点
1. 协议设计的边界与权衡 🔍
待研究问题:
- Protocol 如何平衡灵活性与类型安全?
@runtime_checkable的性能开销如何?- 协议版本演进的最佳实践是什么?
代码位置:_agents.py:154-204
2. MCP 集成的架构深度 🔍
待研究问题:
- MCP 连接池管理策略
- MCP 工具缓存机制
- 多 MCP Server 协调问题
- 错误恢复和重连策略
代码位置:
_mcp.py- MCP 工具完整实现_agents.py:846-859- MCP 展开逻辑_agents.py:1105-1212- MCP Server 创建
3. Middleware 管道实现机制 🔍
待研究问题:
@use_agent_middleware如何实现方法拦截?- Middleware 执行顺序如何控制?
- 如何在 Middleware 中修改/替换执行结果?
代码位置:_middleware.py:300+(需继续阅读)
4. Thread 状态一致性保证 🔍
待研究问题:
- 服务托管 Thread 与本地 Thread 的状态同步
- 并发场景下的 Thread 安全
- 序列化/反序列化的一致性保证
代码位置:_threads.py:293-506
5. 泛型 Options 的类型流 🔍
待研究问题:
ChatAgent[OpenAIOptions]如何传递类型信息?response_format的类型推断机制- TypedDict 与 Pydantic 的互操作
代码位置:
_agents.py:63-76- 泛型类型变量定义_agents.py:587-589- 构造函数泛型参数_types.py- ChatOptions 定义
6. 可观测性埋点实现 🔍
待研究问题:
@use_agent_instrumentation如何实现?- OpenTelemetry 集成机制
- 性能指标的采集策略
代码位置:observability.py(需探索)
值得深挖的子课题
基于以上分析,以下子课题值得进一步深入研究:
- MCP 架构深度分析:理解 MCP 协议集成、连接管理、工具展开机制
- Middleware 管道架构:实现机制、上下文传递、拦截点设计
- Thread 状态管理:服务托管 vs 本地存储、序列化策略、一致性保证
- 泛型类型系统:TypeVar 协变/逆变、TypedDict 类型流、IDE 支持
- 可观测性架构:OTel 集成、装饰器埋点、性能监控
主要设计亮点和独特之处
1. 协议优先的架构哲学
不同于传统框架的继承模式,Agent Framework 使用 Protocol 定义契约,实现了真正的鸭子类型。这种设计让用户可以:
- 不依赖框架类编写 Agent
- 渐进式采用框架功能
- 轻松集成第三方 Agent 实现
2. Agent 即 Tool 的统一抽象
as_tool() 方法将 Agent 和 Tool 统一为同一抽象,实现了:
- Agent 编排(Agent 调用 Agent)
- 层级化任务分解
- Tool 生态的递归组合
3. 灵活的 Options 合并策略
精心设计的 _merge_options 函数,通过差异化合并策略(去重、拼接、覆盖),支持复杂的多层级配置场景,特别适合:
- 多 Agent 协作时的配置继承
- ContextProvider 动态上下文注入
- 运行时参数覆盖
4. MCP 协议原生支持
内置 MCP(Model Context Protocol)支持,让 Agent 可以:
- 连接外部 MCP Server
- 自身作为 MCP Server 提供服务
- 实现跨平台、跨语言的 Agent 互操作
5. 完整的生命周期管理
AsyncExitStack 的使用确保了:
- MCP 连接自动管理
- ChatClient 资源释放
- 异常安全清理
6. 装饰器式横切关注点
通过装饰器实现中间件和可观测性,保持了:
- 业务逻辑纯净
- 功能可组合
- 扩展点清晰
7. 线程模型的双模支持
支持服务托管 Thread 和本地 Thread 两种模式:
- 服务托管:LLM 服务管理对话历史
- 本地托管:应用层管理对话历史(支持自定义存储)
这种设计让框架可以适配不同的部署架构和合规要求。