Tools 与 MCP 集成机制分析
概述
Microsoft Agent Framework 的工具系统是一个高度灵活、可扩展的架构,通过协议驱动设计将函数工具、托管工具和 MCP(Model Context Protocol)集成统一整合。核心设计亮点在于动态 Pydantic 模型创建、函数签名解析和异步上下文管理。
关键文件:
/home/sujie/dev/github/agent-framework/python/packages/core/agent_framework/_tools.py- 工具系统核心(2325行)/home/sujie/dev/github/agent-framework/python/packages/core/agent_framework/_mcp.py- MCP 集成实现/home/sujie/dev/github/agent-framework/python/packages/core/agent_framework/_agents.py- Agent 与工具交互
ToolProtocol 与工具抽象
ToolProtocol 协议定义
@runtime_checkable
class ToolProtocol(Protocol):
"""Represents a generic tool."""
name: str
description: str
additional_properties: dict[str, Any] | None
def __str__(self) -> str:
"""Return a string representation of the tool."""
...
设计亮点:
- 使用
@runtime_checkable允许运行时类型检查 - 采用协议而非基类,实现结构子类型(鸭子类型)
- 允许用户创建完全自定义的工具,无需继承框架类
BaseTool 基础实现
class BaseTool(SerializationMixin):
DEFAULT_EXCLUDE: ClassVar[set[str]] = {"additional_properties"}
def __init__(
self,
*,
name: str,
description: str = "",
additional_properties: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
self.name = name
self.description = description
self.additional_properties = additional_properties
for key, value in kwargs.items():
setattr(self, key, value)
继承自 SerializationMixin,提供序列化支持,允许工具与外部服务通信。
FunctionTool 详解
类定义与泛型设计
class FunctionTool(BaseTool, Generic[ArgsT, ReturnT]):
"""A tool that wraps a Python function to make it callable by AI models."""
INJECTABLE: ClassVar[set[str]] = {"func"}
DEFAULT_EXCLUDE: ClassVar[set[str]] = {"input_model", "_invocation_duration_histogram", "_cached_parameters"}
泛型参数:
ArgsT(bound=BaseModel): 输入参数模型类型ReturnT(default=Any): 返回值类型
核心属性
| 属性 | 类型 | 说明 |
|---|---|---|
func | Callable | 被包装的函数 |
input_model | type[ArgsT] | 参数验证 Pydantic 模型 |
approval_mode | Literal | 审批模式(always_require/never_require) |
max_invocations | int | None | 最大调用次数限制 |
max_invocation_exceptions | int | None | 最大异常次数限制 |
_forward_runtime_kwargs | bool | 是否转发运行时参数 |
动态模型创建流程
┌─────────────────────────────────────────────────────────────────┐
│ FunctionTool 动态模型创建流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. 初始化阶段 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ __init__() │ │
│ │ └─> _resolve_input_model(input_model) │ │
│ │ ├─> None: _create_input_model_from_func() │ │
│ │ ├─> BaseModel: 直接使用 │ │
│ │ └─> Mapping: _create_model_from_json_schema() │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. 从函数创建模型 (_create_input_model_from_func) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. 使用 inspect.signature(func) 获取参数签名 │ │
│ │ 2. 遍历 parameters,跳过 self/cls │ │
│ │ 3. 跳过 VAR_POSITIONAL (*args) 和 VAR_KEYWORD │ │
│ │ 4. 使用 _parse_annotation() 解析类型注解 │ │
│ │ 5. 调用 pydantic.create_model() 动态创建模型 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. 从 JSON Schema 创建模型 (_build_pydantic_model_from_json_schema)│
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1. 解析 properties 和 required 字段 │ │
│ │ 2. 处理 $refs 引用(递归解析 #/$defs/xxx) │ │
│ │ 3. 处理 nested objects(递归创建嵌套模型) │ │
│ │ 4. 处理 typed arrays(list[ItemType]) │ │
│ │ 5. 支持 Literal 类型(const/enum) │ │
│ │ 6. 支持 oneOf + discriminator(多态对象) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
_parse_annotation 实现分析
def _parse_annotation(annotation: Any) -> Any:
"""Parse a type annotation and return the corresponding type.
If the second annotation (after the type) is a string,
then we convert that to a Pydantic Field description.
"""
origin = get_origin(annotation)
if origin is not None:
# Literal types should be returned as-is
if origin is Literal:
return annotation
args = get_args(annotation)
# For other generics, return the origin type
if len(args) > 1 and isinstance(args[1], str):
# Create a new Annotated type with the updated Field
args_list = list(args)
if len(args_list) == 2:
return Annotated[args_list[0], Field(description=args_list[1])]
return Annotated[args_list[0], Field(description=args_list[1]), tuple(args_list[2:])]
return annotation
使用示例:
from typing import Annotated
# 字符串注释转换为 Field
def get_weather(
location: Annotated[str, "The city name"], # 解析为 Field(description="The city name")
unit: Annotated[str, "Temperature unit"] = "celsius",
) -> str:
return f"Weather in {location}: 22°{unit[0].upper()}"
_build_pydantic_model_from_json_schema 完整代码
def _build_pydantic_model_from_json_schema(
model_name: str,
schema: Mapping[str, Any],
) -> type[BaseModel]:
"""Creates a Pydantic model from JSON Schema with support for $refs, nested objects, and typed arrays."""
properties = schema.get("properties")
required = schema.get("required", [])
definitions = schema.get("$defs", {})
if not properties:
return create_model(f"{model_name}_input")
def _resolve_literal_type(prop_details: dict[str, Any]) -> type | None:
"""Check if property should be a Literal type (const or enum)."""
if "const" in prop_details:
return Literal[prop_details["const"]]
if "enum" in prop_details and isinstance(prop_details["enum"], list):
enum_values = prop_details["enum"]
if enum_values:
return Literal[tuple(enum_values)]
return None
def _resolve_type(prop_details: dict[str, Any], parent_name: str = "") -> type:
"""Resolve JSON Schema type to Python type."""
# Handle oneOf + discriminator (polymorphic objects)
if "oneOf" in prop_details and "discriminator" in prop_details:
# ... 实现多态类型解析
# Handle $ref by resolving the reference
if "$ref" in prop_details:
ref = prop_details["$ref"]
if ref.startswith("#/$defs/"):
def_name = ref.split("/")[-1]
if def_name in definitions:
resolved = definitions[def_name]
return _resolve_type(resolved, def_name)
return dict
# Map JSON Schema types to Python types
json_type = prop_details.get("type", "string")
match json_type:
case "integer":
return int
case "number":
return float
case "boolean":
return bool
case "array":
items_schema = prop_details.get("items")
if items_schema and isinstance(items_schema, dict):
item_type = _resolve_type(items_schema, f"{parent_name}_item")
return list[item_type]
return list
case "object":
# Handle nested objects by creating a nested Pydantic model
nested_properties = prop_details.get("properties")
nested_required = prop_details.get("required", [])
if nested_properties and isinstance(nested_properties, dict):
nested_model_name = f"{parent_name}_nested" if parent_name else "NestedModel"
# Recursively build field definitions for the nested model
nested_field_definitions: dict[str, Any] = {}
for nested_prop_name, nested_prop_details in nested_properties.items():
# ... 递归处理嵌套属性
return create_model(nested_model_name, **nested_field_definitions)
return dict
case _:
return str
field_definitions: dict[str, Any] = {}
for prop_name, prop_details in properties.items():
prop_details = json.loads(prop_details) if isinstance(prop_details, str) else prop_details
literal_type = _resolve_literal_type(prop_details)
if literal_type is not None:
python_type = literal_type
else:
python_type = _resolve_type(prop_details, f"{model_name}_{prop_name}")
description = prop_details.get("description", "")
field_kwargs: dict[str, Any] = {}
if description:
field_kwargs["description"] = description
if prop_name in required:
if field_kwargs:
field_definitions[prop_name] = (python_type, Field(**field_kwargs))
else:
field_definitions[prop_name] = (python_type, ...)
else:
default_value = prop_details.get("default", None)
field_kwargs["default"] = default_value
if field_kwargs and any(k != "default" for k in field_kwargs):
field_definitions[prop_name] = (python_type, Field(**field_kwargs))
else:
field_definitions[prop_name] = (python_type, default_value)
return create_model(f"{model_name}_input", **field_definitions)
描述符协议支持
def __get__(self, obj: Any, objtype: type | None = None) -> "FunctionTool[ArgsT, ReturnT]":
"""Implement the descriptor protocol to support bound methods."""
if obj is None:
return self # Accessed from the class
# Check if the wrapped function is a method (has 'self' parameter)
if self.func is not None:
sig = inspect.signature(self.func)
params = list(sig.parameters.keys())
if params and params[0] in {"self", "cls"}:
# Create a new FunctionTool with the bound method
import copy
bound_func = copy.copy(self)
bound_func._instance = obj
return bound_func
return self
作用: 允许 FunctionTool 作为类方法装饰器使用,自动绑定实例。
@tool 装饰器实现
双重重载设计
@overload
def tool(
func: Callable[..., ReturnT | Awaitable[ReturnT]],
*,
name: str | None = None,
description: str | None = None,
approval_mode: Literal["always_require", "never_require"] | None = None,
max_invocations: int | None = None,
max_invocation_exceptions: int | None = None,
additional_properties: dict[str, Any] | None = None,
) -> FunctionTool[Any, ReturnT]: ...
@overload
def tool(
func: None = None,
*,
name: str | None = None,
# ... 同上
) -> Callable[[Callable[..., ReturnT | Awaitable[ReturnT]]], FunctionTool[Any, ReturnT]]: ...
双模式调用:
@tool- 直接装饰函数@tool(name="custom")- 带参数装饰
核心实现
def tool(
func: Callable[..., ReturnT | Awaitable[ReturnT]] | None = None,
*,
name: str | None = None,
description: str | None = None,
approval_mode: Literal["always_require", "never_require"] | None = None,
max_invocations: int | None = None,
max_invocation_exceptions: int | None = None,
additional_properties: dict[str, Any] | None = None,
) -> FunctionTool[Any, ReturnT] | Callable[[Callable[..., ReturnT | Awaitable[ReturnT]]], FunctionTool[Any, ReturnT]]:
def decorator(func: Callable[..., ReturnT | Awaitable[ReturnT]]) -> FunctionTool[Any, ReturnT]:
@wraps(func)
def wrapper(f: Callable[..., ReturnT | Awaitable[ReturnT]]) -> FunctionTool[Any, ReturnT]:
tool_name: str = name or getattr(f, "__name__", "unknown_function")
tool_desc: str = description or (f.__doc__ or "")
return FunctionTool[Any, ReturnT](
name=tool_name,
description=tool_desc,
approval_mode=approval_mode,
max_invocations=max_invocations,
max_invocation_exceptions=max_invocation_exceptions,
additional_properties=additional_properties or {},
func=f,
)
return wrapper(func)
return decorator(func) if func else decorator
使用示例
from agent_framework import tool
from typing import Annotated
# 基本使用
@tool(approval_mode="never_require")
def get_weather(
location: Annotated[str, "The city name"],
unit: Annotated[str, "Temperature unit"] = "celsius",
) -> str:
'''Get the weather for a location.'''
return f"Weather in {location}: 22°{unit[0].upper()}"
# 异步支持
@tool(approval_mode="never_require")
async def async_get_weather(location: str) -> str:
'''Get weather asynchronously.'''
return f"Weather in {location}"
# 自定义名称和描述
@tool(name="custom_weather", description="Custom weather function")
def another_weather_func(location: str) -> str:
return f"Weather in {location}"
Hosted Tools 分析
托管工具类层次
BaseTool
├── HostedCodeInterpreterTool # 代码执行
├── HostedWebSearchTool # 网络搜索
├── HostedImageGenerationTool # 图像生成
├── HostedMCPTool # MCP 服务工具
└── HostedFileSearchTool # 文件搜索
HostedCodeInterpreterTool
class HostedCodeInterpreterTool(BaseTool):
"""Represents a hosted tool for code execution.
Note: This tool does not implement code interpretation itself.
It serves as a marker to inform a service that it is allowed
to execute generated code if the service is capable.
"""
def __init__(
self,
*,
inputs: "Content | dict[str, Any] | str | list[...] | None" = None,
description: str | None = None,
additional_properties: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
if "name" in kwargs:
raise ValueError("The 'name' argument is reserved...")
self.inputs = _parse_inputs(inputs) if inputs else []
super().__init__(
name="code_interpreter", # 固定名称
description=description or "",
additional_properties=additional_properties,
**kwargs,
)
HostedMCPTool
class HostedMCPTool(BaseTool):
"""Represents a MCP tool that is managed and executed by the service."""
def __init__(
self,
*,
name: str,
description: str | None = None,
url: AnyUrl | str,
approval_mode: Literal["always_require", "never_require"] | HostedMCPSpecificApproval | None = None,
allowed_tools: Collection[str] | None = None,
headers: dict[str, str] | None = None,
additional_properties: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
# 验证 approval_mode
if approval_mode is not None:
if isinstance(approval_mode, str):
if approval_mode not in ("always_require", "never_require"):
raise ValueError(...)
elif isinstance(approval_mode, dict):
for key, value in approval_mode.items():
if not isinstance(value, set):
approval_mode[key] = set(value)
self.url = url if isinstance(url, AnyUrl) else AnyUrl(url)
self.approval_mode = approval_mode
self.allowed_tools = set(allowed_tools) if allowed_tools else None
self.headers = headers
特定审批模式:
class HostedMCPSpecificApproval(TypedDict, total=False):
always_require_approval: Collection[str] | None
never_require_approval: Collection[str] | None
MCP 集成机制
MCP 架构概览
┌─────────────────────────────────────────────────────────────────┐
│ MCP 集成架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌──────────────────┐ │
│ │ ChatAgent │────────▶│ MCPTool │ │
│ └─────────────────┘ └────────┬─────────┘ │
│ │ │ │
│ │ tools │ get_mcp_client() │
│ ▼ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ MCP Server (External) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Tools │ │ Prompts │ │ Resources│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ └──────────────────────────────────────────┘ │
│ │
│ 连接类型: │
│ - MCPStdioTool : stdio (本地进程) │
│ - MCPStreamableHTTPTool: HTTP/SSE │
│ - MCPWebsocketTool : WebSocket │
│ │
└─────────────────────────────────────────────────────────────────┘
MCPTool 核心类
class MCPTool:
"""Main MCP class for connecting to Model Context Protocol servers."""
def __init__(
self,
name: str,
description: str | None = None,
approval_mode: Literal["always_require", "never_require"] | HostedMCPSpecificApproval | None = None,
allowed_tools: Collection[str] | None = None,
load_tools: bool = True,
parse_tool_results: Literal[True] | Callable[[types.CallToolResult], Any] | None = True,
load_prompts: bool = True,
parse_prompt_results: Literal[True] | Callable[[types.GetPromptResult], Any] | None = True,
session: ClientSession | None = None,
request_timeout: int | None = None,
chat_client: "ChatClientProtocol | None" = None,
additional_properties: dict[str, Any] | None = None,
) -> None:
self._functions: list[FunctionTool[Any, Any]] = []
self.is_connected: bool = False
self.session = session
self._exit_stack = AsyncExitStack()
连接管理
async def connect(self, *, reset: bool = False) -> None:
"""Connect to the MCP server."""
if reset:
await self._safe_close_exit_stack()
self.session = None
self.is_connected = False
self._exit_stack = AsyncExitStack()
if not self.session:
try:
transport = await self._exit_stack.enter_async_context(self.get_mcp_client())
session = await self._exit_stack.enter_async_context(
ClientSession(
read_stream=transport[0],
write_stream=transport[1],
read_timeout_seconds=timedelta(seconds=self.request_timeout) if self.request_timeout else None,
message_handler=self.message_handler,
logging_callback=self.logging_callback,
sampling_callback=self.sampling_callback,
)
)
await session.initialize()
self.session = session
except ...
self.is_connected = True
if self.load_tools_flag:
await self.load_tools()
if self.load_prompts_flag:
await self.load_prompts()
工具加载流程
async def load_tools(self) -> None:
"""Load tools from the MCP server."""
existing_names = {func.name for func in self._functions}
params: types.PaginatedRequestParams | None = None
while True:
await self._ensure_connected()
tool_list = await self.session.list_tools(params=params)
for tool in tool_list.tools:
local_name = _normalize_mcp_name(tool.name)
if local_name in existing_names:
continue
# 创建 Pydantic 模型
input_model = _get_input_model_from_mcp_tool(tool)
approval_mode = self._determine_approval_mode(local_name)
# 创建 FunctionTool
func: FunctionTool = FunctionTool(
func=partial(self.call_tool, tool.name),
name=local_name,
description=tool.description or "",
approval_mode=approval_mode,
input_model=input_model,
)
self._functions.append(func)
existing_names.add(local_name)
if not tool_list or not tool_list.nextCursor:
break
params = types.PaginatedRequestParams(cursor=tool_list.nextCursor)
ChatAgent 中的 MCP 集成
# _agents.py: ChatAgent.run()
# Resolve final tool list (runtime provided tools + local MCP server tools)
final_tools: list[ToolProtocol | Callable[..., Any] | dict[str, Any]] = []
normalized_tools = [] if tools_ is None else tools_ if isinstance(tools_, list) else [tools_]
for tool in normalized_tools:
if isinstance(tool, MCPTool):
if not tool.is_connected:
await self._async_exit_stack.enter_async_context(tool)
final_tools.extend(tool.functions)
else:
final_tools.append(tool)
for mcp_server in self.mcp_tools:
if not mcp_server.is_connected:
await self._async_exit_stack.enter_async_context(mcp_server)
final_tools.extend(mcp_server.functions)
异步上下文管理
# ChatAgent 的上下文管理器
async def __aenter__(self) -> "Self":
"""Enter the async context manager."""
for context_manager in chain([self.chat_client], self.mcp_tools):
if isinstance(context_manager, AbstractAsyncContextManager):
await self._async_exit_stack.enter_async_context(context_manager)
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
"""Exit the async context manager."""
await self._async_exit_stack.aclose()
MCP 类型转换
def _parse_content_from_mcp(
mcp_type: types.ImageContent | types.TextContent | ...
) -> list[Content]:
"""Parse an MCP type into an Agent Framework type."""
match mcp_type:
case types.TextContent():
return [Content.from_text(text=mcp_type.text, raw_representation=mcp_type)]
case types.ImageContent() | types.AudioContent():
data_bytes = base64.b64decode(mcp_type.data) if isinstance(mcp_type.data, str) else mcp_type.data
return [Content.from_data(data=data_bytes, media_type=mcp_type.mimeType, ...)]
case types.ResourceLink():
return [Content.from_uri(uri=str(mcp_type.uri), media_type=mcp_type.mimeType, ...)]
case types.ToolUseContent():
return [Content.from_function_call(call_id=mcp_type.id, name=mcp_type.name, arguments=mcp_type.input, ...)]
case types.ToolResultContent():
return [Content.from_function_result(call_id=mcp_type.toolUseId, result=..., exception=...)]
深挖价值点
1. 动态模型创建的工程价值
_build_pydantic_model_from_json_schema 是极具工程价值的技术点:
优点:
- 零样板代码:无需手写 Pydantic 模型,直接从 JSON Schema 生成
- 支持复杂类型:$refs、嵌套对象、泛型数组、多态对象
- 运行时灵活:可根据外部 API 动态调整模型结构
应用场景:
- MCP 服务器工具导入
- OpenAPI 规范集成
- 用户自定义工具配置
代码行数:约 180 行,但支持完整 JSON Schema 规范子集,投入产出比极高。
2. 工具系统的可扩展性
分层架构:
- Protocol 层:ToolProtocol 定义契约,支持鸭子类型
- Base 层:BaseTool 提供序列化和通用属性
- 实现层:FunctionTool/Hosted Tools 各司其职
- 集成层:MCP 桥接外部服务
添加新工具类型的成本极低:
- 继承 BaseTool(约 20 行)
- 实现特定逻辑(约 50 行)
- 自动获得序列化、验证、执行能力
3. 异步上下文管理的优雅实现
# 同时管理多个上下文(chat_client + mcp_tools)
for context_manager in chain([self.chat_client], self.mcp_tools):
if isinstance(context_manager, AbstractAsyncContextManager):
await self._async_exit_stack.enter_async_context(context_manager)
设计亮点:
- 使用
AsyncExitStack统一处理资源清理 - 自动检测上下文管理器能力(无需显式检查)
- 支持 MCP 连接断开自动重连
4. 函数调用生命周期管理
# 双重限制保护
if self.max_invocations is not None and self.invocation_count >= self.max_invocations:
raise ToolException("Function has reached its maximum invocation limit...")
if self.max_invocation_exceptions is not None and self.invocation_exception_count >= self.max_invocation_exceptions:
raise ToolException("Function has reached its maximum exception limit...")
设计意图:
- 防止无限循环调用
- 保护系统资源不被异常工具耗尽
- 提供明确的错误信息
结论
是否值得深挖动态模型创建?
是的,强烈建议深挖,理由如下:
- 技术深度适中:约 180 行代码,可完整理解实现
- 可重用性高:_build_pydantic_model_from_json_schema 可独立使用
- 教育价值大:展示了如何将 JSON Schema 转换为 Python 类型系统
- 实战应用广:API 集成、配置驱动开发、动态表单等场景均适用
- 扩展潜力大:可扩展支持更多 JSON Schema 特性(allOf、anyOf、条件 schema 等)
推荐深挖顺序:
_build_pydantic_model_from_json_schema核心逻辑_resolve_type递归解析器_resolve_literal_type字面量处理- 嵌套对象和 $ref 解析
- oneOf + discriminator 多态支持