Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Durable Agent 持久化机制分析

Durable Agent 持久化机制分析

概述

Microsoft Agent Framework 的 Durable Agent 扩展是一个基于 Azure Durable Functions 和 Durable Task Framework 的持久化 Agent 系统。它解决了传统 Agent 在无服务器环境下面临的核心挑战:状态持久化、长期运行、故障恢复。

核心设计目标

  1. 状态持久化:Agent 会话历史自动持久化,支持跨调用保持上下文
  2. 故障恢复:执行状态在故障后可恢复,无需重新执行已完成的工作
  3. 零成本等待:支持长时间等待(如人工审批),不消耗计算资源
  4. 双模式执行:支持客户端直连和编排上下文两种使用模式

DurableAIAgent 架构

与 AgentProtocol 的关键差异

特性AgentProtocolDurableAIAgent
run() 返回类型Coroutine (需 await)Task (需 yield)
执行模式异步立即执行惰性执行/信号触发
状态管理内存中持久化存储
适用场景标准应用编排/工作流

架构组件

双模式设计

1. 客户端模式 (Client Mode)

# 外部客户端直接调用
client = TaskHubGrpcClient(host_address)
executor = ClientAgentExecutor(client)
agent = DurableAIAgent(executor, "MyAgent")

# 阻塞式调用,轮询等待响应
response = agent.run("Hello")  # 返回 AgentResponse

特点:

  • 使用 TaskHubGrpcClient 与 Durable Task 服务通信
  • 通过 信号(Signal) 触发 Agent Entity
  • 轮询(Polling) 机制获取响应
  • 支持 Fire-and-Forget 模式(不等响应)

2. 编排上下文模式 (Orchestration Mode)

# 在 Durable Functions 编排中使用
@app.orchestration_trigger(context_name="context")
def workflow(context: DurableOrchestrationContext):
    executor = OrchestrationAgentExecutor(context)
    agent = DurableAIAgent(executor, "MyAgent")
    
    # 使用 yield 等待完成
    response = yield agent.run("Hello")  # 返回 DurableAgentTask

特点:

  • 使用 OrchestrationContext 进行确定性执行
  • 通过 call_entity 调用 Agent Entity
  • 返回 Task 对象供 yield 使用
  • 支持编排重放(replay)语义

状态管理

状态架构层级

会话 ID 管理

@dataclass
class AgentSessionId:
    name: str
    key: str
    ENTITY_NAME_PREFIX: str = "dafx-"
    
    @property
    def entity_name(self) -> str:
        return f"{self.ENTITY_NAME_PREFIX}{self.name}"
    
    def __str__(self) -> str:
        return f"@{self.name}@{self.key}"

格式:@AgentName@uuid-key

用途:

  1. 唯一标识 Durable Entity 实例
  2. 从线程恢复状态
  3. 通过 HTTP Header x-ms-thread-id 传递

状态持久化策略

关键特性:

  • 每个请求/响应对都生成 correlation_id 用于匹配
  • 支持 try_get_agent_response(correlation_id) 快速查询
  • 错误状态也被持久化,支持错误恢复

执行器机制

执行流程对比

客户端执行器

编排执行器

DurableAgentTask 转换机制

class DurableAgentTask(CompositeTask[AgentResponse], CompletableTask[AgentResponse]):
    """包装实体调用任务,将原始结果转换为 AgentResponse"""
    
    def on_child_completed(self, task: Task[Any]) -> None:
        if task.is_failed:
            self.fail("call_entity Task failed", task.get_exception())
            return
        
        raw_result = task.get_result()
        response = load_agent_response(raw_result)
        
        if self._response_format:
            ensure_response_format(self._response_format, self._correlation_id, response)
        
        self.complete(response)

关键职责:

  1. 拦截底层实体任务完成
  2. 将原始结果反序列化为 AgentResponse
  3. 验证响应格式(如果指定了 Pydantic 模型)
  4. 处理失败情况

实体管理

AgentEntity 核心逻辑

实体执行流程

class AgentEntity:
    async def run(self, request: RunRequest) -> AgentResponse:
        # 1. 将请求记录到状态历史
        state_request = DurableAgentStateRequest.from_run_request(run_request)
        self.state.data.conversation_history.append(state_request)
        
        # 2. 构建消息历史(排除错误响应)
        chat_messages = [
            m.to_chat_message() 
            for entry in self.state.data.conversation_history
            if not self._is_error_response(entry)
            for m in entry.messages
        ]
        
        try:
            # 3. 执行 Agent(优先使用流式)
            agent_run_response = await self._invoke_agent(...)
            
            # 4. 记录响应到状态
            state_response = DurableAgentStateResponse.from_run_response(...)
            self.state.data.conversation_history.append(state_response)
            self.persist_state()
            
            return agent_run_response
            
        except Exception as exc:
            # 5. 错误处理 - 记录错误状态
            error_response = create_error_response(exc)
            error_state_response = DurableAgentStateResponse.from_run_response(...)
            error_state_response.is_error = True
            self.state.data.conversation_history.append(error_state_response)
            self.persist_state()
            return error_response

集成点

1. Durable Task Framework 集成

durabletask-python
├── client/TaskHubGrpcClient
│   ├── signal_entity()      # 客户端信号触发
│   ├── call_entity()        # 编排调用(通过context)
│   └── get_entity()         # 获取状态
├── task/OrchestrationContext
│   ├── signal_entity()      # 编排内信号
│   ├── call_entity()        # 编排内调用
│   └── new_uuid()           # 确定性UUID
└── entities/DurableEntity
    ├── get_state()          # 读取状态
    └── set_state()          # 写入状态

2. Azure Functions 集成(azurefunctions 包)

Azure Functions 特定功能:

  • agent_entity_trigger:Entity 触发器装饰器
  • 内置 HTTP 端点自动生成
  • 与 Durable Task Scheduler 集成

深挖价值点

设计亮点

  1. 双模式抽象:通过 DurableAgentExecutor 抽象层,统一了客户端和编排上下文的使用方式

  2. 协程 vs Task 转换:巧妙处理了 Python async/await 和 Durable Functions yield 模式的差异

  3. 状态 Schema 版本化:SCHEMA_VERSION = "1.1.0" 支持未来状态迁移

  4. Correlation ID 追踪:每个请求/响应对都有唯一ID,支持精确匹配和日志追踪

  5. 错误状态持久化:错误不仅返回,还记录到历史中,支持故障排查和重试策略

  6. 内容类型完备性:支持 Text、FunctionCall、FunctionResult、Error、Data、URI 等10+内容类型

  7. 流式回调机制:AgentResponseCallbackProtocol 允许流式响应实时通知

是否值得深挖状态持久化机制?

结论:值得深挖,理由如下:

  1. 生产关键路径:状态持久化是 Durable Agent 的核心价值,理解其机制对排查生产问题至关重要

  2. 性能瓶颈点:状态序列化/反序列化、存储I/O是主要性能瓶颈,需要优化策略

  3. 一致性保障:理解 correlation_id 匹配机制和版本化 Schema,有助于设计可靠的重试和故障恢复

  4. 扩展基础:如果要扩展支持自定义存储后端(如 Redis、CosmosDB),必须理解当前状态抽象

  5. 成本优化:状态大小直接影响存储成本,理解字段结构有助于设计存储优化策略

推荐阅读顺序

  1. _durable_agent_state.py - 状态模型定义(最核心)
  2. _entities.py - Agent 实体执行逻辑
  3. _executors.py - 客户端/编排执行器
  4. _shim.py - DurableAIAgent 代理层
  5. _models.py - 请求/线程模型

附录:核心类关系总图