Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Tools 与 MCP 集成机制分析

Tools 与 MCP 集成机制分析

概述

Microsoft Agent Framework 的工具系统是一个高度灵活、可扩展的架构,通过协议驱动设计将函数工具、托管工具和 MCP(Model Context Protocol)集成统一整合。核心设计亮点在于动态 Pydantic 模型创建、函数签名解析和异步上下文管理。

关键文件:

  • /home/sujie/dev/github/agent-framework/python/packages/core/agent_framework/_tools.py - 工具系统核心(2325行)
  • /home/sujie/dev/github/agent-framework/python/packages/core/agent_framework/_mcp.py - MCP 集成实现
  • /home/sujie/dev/github/agent-framework/python/packages/core/agent_framework/_agents.py - Agent 与工具交互

ToolProtocol 与工具抽象

ToolProtocol 协议定义

@runtime_checkable
class ToolProtocol(Protocol):
    """Represents a generic tool."""
    name: str
    description: str
    additional_properties: dict[str, Any] | None

    def __str__(self) -> str:
        """Return a string representation of the tool."""
        ...

设计亮点:

  1. 使用 @runtime_checkable 允许运行时类型检查
  2. 采用协议而非基类,实现结构子类型(鸭子类型)
  3. 允许用户创建完全自定义的工具,无需继承框架类

BaseTool 基础实现

class BaseTool(SerializationMixin):
    DEFAULT_EXCLUDE: ClassVar[set[str]] = {"additional_properties"}

    def __init__(
        self,
        *,
        name: str,
        description: str = "",
        additional_properties: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> None:
        self.name = name
        self.description = description
        self.additional_properties = additional_properties
        for key, value in kwargs.items():
            setattr(self, key, value)

继承自 SerializationMixin,提供序列化支持,允许工具与外部服务通信。


FunctionTool 详解

类定义与泛型设计

class FunctionTool(BaseTool, Generic[ArgsT, ReturnT]):
    """A tool that wraps a Python function to make it callable by AI models."""

    INJECTABLE: ClassVar[set[str]] = {"func"}
    DEFAULT_EXCLUDE: ClassVar[set[str]] = {"input_model", "_invocation_duration_histogram", "_cached_parameters"}

泛型参数:

  • ArgsT (bound=BaseModel): 输入参数模型类型
  • ReturnT (default=Any): 返回值类型

核心属性

属性类型说明
funcCallable被包装的函数
input_modeltype[ArgsT]参数验证 Pydantic 模型
approval_modeLiteral审批模式(always_require/never_require)
max_invocationsint | None最大调用次数限制
max_invocation_exceptionsint | None最大异常次数限制
_forward_runtime_kwargsbool是否转发运行时参数

动态模型创建流程

┌─────────────────────────────────────────────────────────────────┐
│              FunctionTool 动态模型创建流程                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. 初始化阶段                                                   │
│     ┌─────────────────────────────────────────────────────┐    │
│     │ __init__()                                          │    │
│     │  └─> _resolve_input_model(input_model)              │    │
│     │       ├─> None: _create_input_model_from_func()     │    │
│     │       ├─> BaseModel: 直接使用                        │    │
│     │       └─> Mapping: _create_model_from_json_schema() │    │
│     └─────────────────────────────────────────────────────┘    │
│                                                                 │
│  2. 从函数创建模型 (_create_input_model_from_func)              │
│     ┌─────────────────────────────────────────────────────┐    │
│     │ 1. 使用 inspect.signature(func) 获取参数签名        │    │
│     │ 2. 遍历 parameters,跳过 self/cls                   │    │
│     │ 3. 跳过 VAR_POSITIONAL (*args) 和 VAR_KEYWORD       │    │
│     │ 4. 使用 _parse_annotation() 解析类型注解            │    │
│     │ 5. 调用 pydantic.create_model() 动态创建模型        │    │
│     └─────────────────────────────────────────────────────┘    │
│                                                                 │
│  3. 从 JSON Schema 创建模型 (_build_pydantic_model_from_json_schema)│
│     ┌─────────────────────────────────────────────────────┐    │
│     │ 1. 解析 properties 和 required 字段                 │    │
│     │ 2. 处理 $refs 引用(递归解析 #/$defs/xxx)          │    │
│     │ 3. 处理 nested objects(递归创建嵌套模型)          │    │
│     │ 4. 处理 typed arrays(list[ItemType])              │    │
│     │ 5. 支持 Literal 类型(const/enum)                  │    │
│     │ 6. 支持 oneOf + discriminator(多态对象)           │    │
│     └─────────────────────────────────────────────────────┘    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

_parse_annotation 实现分析

def _parse_annotation(annotation: Any) -> Any:
    """Parse a type annotation and return the corresponding type.
    
    If the second annotation (after the type) is a string, 
    then we convert that to a Pydantic Field description.
    """
    origin = get_origin(annotation)
    if origin is not None:
        # Literal types should be returned as-is
        if origin is Literal:
            return annotation

        args = get_args(annotation)
        # For other generics, return the origin type
        if len(args) > 1 and isinstance(args[1], str):
            # Create a new Annotated type with the updated Field
            args_list = list(args)
            if len(args_list) == 2:
                return Annotated[args_list[0], Field(description=args_list[1])]
            return Annotated[args_list[0], Field(description=args_list[1]), tuple(args_list[2:])]
    return annotation

使用示例:

from typing import Annotated

# 字符串注释转换为 Field
def get_weather(
    location: Annotated[str, "The city name"],  # 解析为 Field(description="The city name")
    unit: Annotated[str, "Temperature unit"] = "celsius",
) -> str:
    return f"Weather in {location}: 22°{unit[0].upper()}"

_build_pydantic_model_from_json_schema 完整代码

def _build_pydantic_model_from_json_schema(
    model_name: str,
    schema: Mapping[str, Any],
) -> type[BaseModel]:
    """Creates a Pydantic model from JSON Schema with support for $refs, nested objects, and typed arrays."""
    properties = schema.get("properties")
    required = schema.get("required", [])
    definitions = schema.get("$defs", {})

    if not properties:
        return create_model(f"{model_name}_input")

    def _resolve_literal_type(prop_details: dict[str, Any]) -> type | None:
        """Check if property should be a Literal type (const or enum)."""
        if "const" in prop_details:
            return Literal[prop_details["const"]]
        if "enum" in prop_details and isinstance(prop_details["enum"], list):
            enum_values = prop_details["enum"]
            if enum_values:
                return Literal[tuple(enum_values)]
        return None

    def _resolve_type(prop_details: dict[str, Any], parent_name: str = "") -> type:
        """Resolve JSON Schema type to Python type."""
        # Handle oneOf + discriminator (polymorphic objects)
        if "oneOf" in prop_details and "discriminator" in prop_details:
            # ... 实现多态类型解析

        # Handle $ref by resolving the reference
        if "$ref" in prop_details:
            ref = prop_details["$ref"]
            if ref.startswith("#/$defs/"):
                def_name = ref.split("/")[-1]
                if def_name in definitions:
                    resolved = definitions[def_name]
                    return _resolve_type(resolved, def_name)
            return dict

        # Map JSON Schema types to Python types
        json_type = prop_details.get("type", "string")
        match json_type:
            case "integer":
                return int
            case "number":
                return float
            case "boolean":
                return bool
            case "array":
                items_schema = prop_details.get("items")
                if items_schema and isinstance(items_schema, dict):
                    item_type = _resolve_type(items_schema, f"{parent_name}_item")
                    return list[item_type]
                return list
            case "object":
                # Handle nested objects by creating a nested Pydantic model
                nested_properties = prop_details.get("properties")
                nested_required = prop_details.get("required", [])

                if nested_properties and isinstance(nested_properties, dict):
                    nested_model_name = f"{parent_name}_nested" if parent_name else "NestedModel"
                    # Recursively build field definitions for the nested model
                    nested_field_definitions: dict[str, Any] = {}
                    for nested_prop_name, nested_prop_details in nested_properties.items():
                        # ... 递归处理嵌套属性
                    return create_model(nested_model_name, **nested_field_definitions)
                return dict
            case _:
                return str

    field_definitions: dict[str, Any] = {}
    for prop_name, prop_details in properties.items():
        prop_details = json.loads(prop_details) if isinstance(prop_details, str) else prop_details

        literal_type = _resolve_literal_type(prop_details)
        if literal_type is not None:
            python_type = literal_type
        else:
            python_type = _resolve_type(prop_details, f"{model_name}_{prop_name}")
        description = prop_details.get("description", "")

        field_kwargs: dict[str, Any] = {}
        if description:
            field_kwargs["description"] = description

        if prop_name in required:
            if field_kwargs:
                field_definitions[prop_name] = (python_type, Field(**field_kwargs))
            else:
                field_definitions[prop_name] = (python_type, ...)
        else:
            default_value = prop_details.get("default", None)
            field_kwargs["default"] = default_value
            if field_kwargs and any(k != "default" for k in field_kwargs):
                field_definitions[prop_name] = (python_type, Field(**field_kwargs))
            else:
                field_definitions[prop_name] = (python_type, default_value)

    return create_model(f"{model_name}_input", **field_definitions)

描述符协议支持

def __get__(self, obj: Any, objtype: type | None = None) -> "FunctionTool[ArgsT, ReturnT]":
    """Implement the descriptor protocol to support bound methods."""
    if obj is None:
        return self  # Accessed from the class

    # Check if the wrapped function is a method (has 'self' parameter)
    if self.func is not None:
        sig = inspect.signature(self.func)
        params = list(sig.parameters.keys())
        if params and params[0] in {"self", "cls"}:
            # Create a new FunctionTool with the bound method
            import copy
            bound_func = copy.copy(self)
            bound_func._instance = obj
            return bound_func
    return self

作用: 允许 FunctionTool 作为类方法装饰器使用,自动绑定实例。


@tool 装饰器实现

双重重载设计

@overload
def tool(
    func: Callable[..., ReturnT | Awaitable[ReturnT]],
    *,
    name: str | None = None,
    description: str | None = None,
    approval_mode: Literal["always_require", "never_require"] | None = None,
    max_invocations: int | None = None,
    max_invocation_exceptions: int | None = None,
    additional_properties: dict[str, Any] | None = None,
) -> FunctionTool[Any, ReturnT]: ...

@overload
def tool(
    func: None = None,
    *,
    name: str | None = None,
    # ... 同上
) -> Callable[[Callable[..., ReturnT | Awaitable[ReturnT]]], FunctionTool[Any, ReturnT]]: ...

双模式调用:

  1. @tool - 直接装饰函数
  2. @tool(name="custom") - 带参数装饰

核心实现

def tool(
    func: Callable[..., ReturnT | Awaitable[ReturnT]] | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    approval_mode: Literal["always_require", "never_require"] | None = None,
    max_invocations: int | None = None,
    max_invocation_exceptions: int | None = None,
    additional_properties: dict[str, Any] | None = None,
) -> FunctionTool[Any, ReturnT] | Callable[[Callable[..., ReturnT | Awaitable[ReturnT]]], FunctionTool[Any, ReturnT]]:
    
    def decorator(func: Callable[..., ReturnT | Awaitable[ReturnT]]) -> FunctionTool[Any, ReturnT]:
        @wraps(func)
        def wrapper(f: Callable[..., ReturnT | Awaitable[ReturnT]]) -> FunctionTool[Any, ReturnT]:
            tool_name: str = name or getattr(f, "__name__", "unknown_function")
            tool_desc: str = description or (f.__doc__ or "")
            return FunctionTool[Any, ReturnT](
                name=tool_name,
                description=tool_desc,
                approval_mode=approval_mode,
                max_invocations=max_invocations,
                max_invocation_exceptions=max_invocation_exceptions,
                additional_properties=additional_properties or {},
                func=f,
            )
        return wrapper(func)

    return decorator(func) if func else decorator

使用示例

from agent_framework import tool
from typing import Annotated

# 基本使用
@tool(approval_mode="never_require")
def get_weather(
    location: Annotated[str, "The city name"],
    unit: Annotated[str, "Temperature unit"] = "celsius",
) -> str:
    '''Get the weather for a location.'''
    return f"Weather in {location}: 22°{unit[0].upper()}"

# 异步支持
@tool(approval_mode="never_require")
async def async_get_weather(location: str) -> str:
    '''Get weather asynchronously.'''
    return f"Weather in {location}"

# 自定义名称和描述
@tool(name="custom_weather", description="Custom weather function")
def another_weather_func(location: str) -> str:
    return f"Weather in {location}"

Hosted Tools 分析

托管工具类层次

BaseTool
├── HostedCodeInterpreterTool    # 代码执行
├── HostedWebSearchTool          # 网络搜索
├── HostedImageGenerationTool    # 图像生成
├── HostedMCPTool                # MCP 服务工具
└── HostedFileSearchTool         # 文件搜索

HostedCodeInterpreterTool

class HostedCodeInterpreterTool(BaseTool):
    """Represents a hosted tool for code execution.
    
    Note: This tool does not implement code interpretation itself.
    It serves as a marker to inform a service that it is allowed 
    to execute generated code if the service is capable.
    """
    
    def __init__(
        self,
        *,
        inputs: "Content | dict[str, Any] | str | list[...] | None" = None,
        description: str | None = None,
        additional_properties: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> None:
        if "name" in kwargs:
            raise ValueError("The 'name' argument is reserved...")
        
        self.inputs = _parse_inputs(inputs) if inputs else []
        
        super().__init__(
            name="code_interpreter",  # 固定名称
            description=description or "",
            additional_properties=additional_properties,
            **kwargs,
        )

HostedMCPTool

class HostedMCPTool(BaseTool):
    """Represents a MCP tool that is managed and executed by the service."""
    
    def __init__(
        self,
        *,
        name: str,
        description: str | None = None,
        url: AnyUrl | str,
        approval_mode: Literal["always_require", "never_require"] | HostedMCPSpecificApproval | None = None,
        allowed_tools: Collection[str] | None = None,
        headers: dict[str, str] | None = None,
        additional_properties: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> None:
        # 验证 approval_mode
        if approval_mode is not None:
            if isinstance(approval_mode, str):
                if approval_mode not in ("always_require", "never_require"):
                    raise ValueError(...)
            elif isinstance(approval_mode, dict):
                for key, value in approval_mode.items():
                    if not isinstance(value, set):
                        approval_mode[key] = set(value)
        
        self.url = url if isinstance(url, AnyUrl) else AnyUrl(url)
        self.approval_mode = approval_mode
        self.allowed_tools = set(allowed_tools) if allowed_tools else None
        self.headers = headers

特定审批模式:

class HostedMCPSpecificApproval(TypedDict, total=False):
    always_require_approval: Collection[str] | None
    never_require_approval: Collection[str] | None

MCP 集成机制

MCP 架构概览

┌─────────────────────────────────────────────────────────────────┐
│                    MCP 集成架构                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────┐         ┌──────────────────┐             │
│  │    ChatAgent    │────────▶│    MCPTool       │             │
│  └─────────────────┘         └────────┬─────────┘             │
│         │                             │                       │
│         │ tools                       │ get_mcp_client()      │
│         ▼                             ▼                       │
│  ┌──────────────────────────────────────────┐                │
│  │           MCP Server (External)          │                │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ │                │
│  │  │  Tools   │ │ Prompts  │ │ Resources│ │                │
│  │  └──────────┘ └──────────┘ └──────────┘ │                │
│  └──────────────────────────────────────────┘                │
│                                                                 │
│  连接类型:                                                     │
│  - MCPStdioTool        : stdio (本地进程)                      │
│  - MCPStreamableHTTPTool: HTTP/SSE                            │
│  - MCPWebsocketTool    : WebSocket                            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

MCPTool 核心类

class MCPTool:
    """Main MCP class for connecting to Model Context Protocol servers."""
    
    def __init__(
        self,
        name: str,
        description: str | None = None,
        approval_mode: Literal["always_require", "never_require"] | HostedMCPSpecificApproval | None = None,
        allowed_tools: Collection[str] | None = None,
        load_tools: bool = True,
        parse_tool_results: Literal[True] | Callable[[types.CallToolResult], Any] | None = True,
        load_prompts: bool = True,
        parse_prompt_results: Literal[True] | Callable[[types.GetPromptResult], Any] | None = True,
        session: ClientSession | None = None,
        request_timeout: int | None = None,
        chat_client: "ChatClientProtocol | None" = None,
        additional_properties: dict[str, Any] | None = None,
    ) -> None:
        self._functions: list[FunctionTool[Any, Any]] = []
        self.is_connected: bool = False
        self.session = session
        self._exit_stack = AsyncExitStack()

连接管理

async def connect(self, *, reset: bool = False) -> None:
    """Connect to the MCP server."""
    if reset:
        await self._safe_close_exit_stack()
        self.session = None
        self.is_connected = False
        self._exit_stack = AsyncExitStack()
    
    if not self.session:
        try:
            transport = await self._exit_stack.enter_async_context(self.get_mcp_client())
            session = await self._exit_stack.enter_async_context(
                ClientSession(
                    read_stream=transport[0],
                    write_stream=transport[1],
                    read_timeout_seconds=timedelta(seconds=self.request_timeout) if self.request_timeout else None,
                    message_handler=self.message_handler,
                    logging_callback=self.logging_callback,
                    sampling_callback=self.sampling_callback,
                )
            )
            await session.initialize()
            self.session = session
        except ...
    
    self.is_connected = True
    if self.load_tools_flag:
        await self.load_tools()
    if self.load_prompts_flag:
        await self.load_prompts()

工具加载流程

async def load_tools(self) -> None:
    """Load tools from the MCP server."""
    existing_names = {func.name for func in self._functions}
    
    params: types.PaginatedRequestParams | None = None
    while True:
        await self._ensure_connected()
        tool_list = await self.session.list_tools(params=params)
        
        for tool in tool_list.tools:
            local_name = _normalize_mcp_name(tool.name)
            if local_name in existing_names:
                continue
            
            # 创建 Pydantic 模型
            input_model = _get_input_model_from_mcp_tool(tool)
            approval_mode = self._determine_approval_mode(local_name)
            
            # 创建 FunctionTool
            func: FunctionTool = FunctionTool(
                func=partial(self.call_tool, tool.name),
                name=local_name,
                description=tool.description or "",
                approval_mode=approval_mode,
                input_model=input_model,
            )
            self._functions.append(func)
            existing_names.add(local_name)
        
        if not tool_list or not tool_list.nextCursor:
            break
        params = types.PaginatedRequestParams(cursor=tool_list.nextCursor)

ChatAgent 中的 MCP 集成

# _agents.py: ChatAgent.run()
# Resolve final tool list (runtime provided tools + local MCP server tools)
final_tools: list[ToolProtocol | Callable[..., Any] | dict[str, Any]] = []
normalized_tools = [] if tools_ is None else tools_ if isinstance(tools_, list) else [tools_]

for tool in normalized_tools:
    if isinstance(tool, MCPTool):
        if not tool.is_connected:
            await self._async_exit_stack.enter_async_context(tool)
        final_tools.extend(tool.functions)
    else:
        final_tools.append(tool)

for mcp_server in self.mcp_tools:
    if not mcp_server.is_connected:
        await self._async_exit_stack.enter_async_context(mcp_server)
    final_tools.extend(mcp_server.functions)

异步上下文管理

# ChatAgent 的上下文管理器
async def __aenter__(self) -> "Self":
    """Enter the async context manager."""
    for context_manager in chain([self.chat_client], self.mcp_tools):
        if isinstance(context_manager, AbstractAsyncContextManager):
            await self._async_exit_stack.enter_async_context(context_manager)
    return self

async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: Any,
) -> None:
    """Exit the async context manager."""
    await self._async_exit_stack.aclose()

MCP 类型转换

def _parse_content_from_mcp(
    mcp_type: types.ImageContent | types.TextContent | ...
) -> list[Content]:
    """Parse an MCP type into an Agent Framework type."""
    match mcp_type:
        case types.TextContent():
            return [Content.from_text(text=mcp_type.text, raw_representation=mcp_type)]
        case types.ImageContent() | types.AudioContent():
            data_bytes = base64.b64decode(mcp_type.data) if isinstance(mcp_type.data, str) else mcp_type.data
            return [Content.from_data(data=data_bytes, media_type=mcp_type.mimeType, ...)]
        case types.ResourceLink():
            return [Content.from_uri(uri=str(mcp_type.uri), media_type=mcp_type.mimeType, ...)]
        case types.ToolUseContent():
            return [Content.from_function_call(call_id=mcp_type.id, name=mcp_type.name, arguments=mcp_type.input, ...)]
        case types.ToolResultContent():
            return [Content.from_function_result(call_id=mcp_type.toolUseId, result=..., exception=...)]

深挖价值点

1. 动态模型创建的工程价值

_build_pydantic_model_from_json_schema 是极具工程价值的技术点:

优点:

  • 零样板代码:无需手写 Pydantic 模型,直接从 JSON Schema 生成
  • 支持复杂类型:$refs、嵌套对象、泛型数组、多态对象
  • 运行时灵活:可根据外部 API 动态调整模型结构

应用场景:

  • MCP 服务器工具导入
  • OpenAPI 规范集成
  • 用户自定义工具配置

代码行数:约 180 行,但支持完整 JSON Schema 规范子集,投入产出比极高。

2. 工具系统的可扩展性

分层架构:

  1. Protocol 层:ToolProtocol 定义契约,支持鸭子类型
  2. Base 层:BaseTool 提供序列化和通用属性
  3. 实现层:FunctionTool/Hosted Tools 各司其职
  4. 集成层:MCP 桥接外部服务

添加新工具类型的成本极低:

  • 继承 BaseTool(约 20 行)
  • 实现特定逻辑(约 50 行)
  • 自动获得序列化、验证、执行能力

3. 异步上下文管理的优雅实现

# 同时管理多个上下文(chat_client + mcp_tools)
for context_manager in chain([self.chat_client], self.mcp_tools):
    if isinstance(context_manager, AbstractAsyncContextManager):
        await self._async_exit_stack.enter_async_context(context_manager)

设计亮点:

  • 使用 AsyncExitStack 统一处理资源清理
  • 自动检测上下文管理器能力(无需显式检查)
  • 支持 MCP 连接断开自动重连

4. 函数调用生命周期管理

# 双重限制保护
if self.max_invocations is not None and self.invocation_count >= self.max_invocations:
    raise ToolException("Function has reached its maximum invocation limit...")

if self.max_invocation_exceptions is not None and self.invocation_exception_count >= self.max_invocation_exceptions:
    raise ToolException("Function has reached its maximum exception limit...")

设计意图:

  • 防止无限循环调用
  • 保护系统资源不被异常工具耗尽
  • 提供明确的错误信息

结论

是否值得深挖动态模型创建?

是的,强烈建议深挖,理由如下:

  1. 技术深度适中:约 180 行代码,可完整理解实现
  2. 可重用性高:_build_pydantic_model_from_json_schema 可独立使用
  3. 教育价值大:展示了如何将 JSON Schema 转换为 Python 类型系统
  4. 实战应用广:API 集成、配置驱动开发、动态表单等场景均适用
  5. 扩展潜力大:可扩展支持更多 JSON Schema 特性(allOf、anyOf、条件 schema 等)

推荐深挖顺序:

  1. _build_pydantic_model_from_json_schema 核心逻辑
  2. _resolve_type 递归解析器
  3. _resolve_literal_type 字面量处理
  4. 嵌套对象和 $ref 解析
  5. oneOf + discriminator 多态支持