Durable Agent 持久化机制分析
概述
Microsoft Agent Framework 的 Durable Agent 扩展是一个基于 Azure Durable Functions 和 Durable Task Framework 的持久化 Agent 系统。它解决了传统 Agent 在无服务器环境下面临的核心挑战:状态持久化、长期运行、故障恢复。
核心设计目标
- 状态持久化:Agent 会话历史自动持久化,支持跨调用保持上下文
- 故障恢复:执行状态在故障后可恢复,无需重新执行已完成的工作
- 零成本等待:支持长时间等待(如人工审批),不消耗计算资源
- 双模式执行:支持客户端直连和编排上下文两种使用模式
DurableAIAgent 架构
与 AgentProtocol 的关键差异
| 特性 | AgentProtocol | DurableAIAgent |
|---|---|---|
run() 返回类型 | Coroutine (需 await) | Task (需 yield) |
| 执行模式 | 异步立即执行 | 惰性执行/信号触发 |
| 状态管理 | 内存中 | 持久化存储 |
| 适用场景 | 标准应用 | 编排/工作流 |
架构组件
双模式设计
1. 客户端模式 (Client Mode)
# 外部客户端直接调用
client = TaskHubGrpcClient(host_address)
executor = ClientAgentExecutor(client)
agent = DurableAIAgent(executor, "MyAgent")
# 阻塞式调用,轮询等待响应
response = agent.run("Hello") # 返回 AgentResponse
特点:
- 使用
TaskHubGrpcClient与 Durable Task 服务通信 - 通过 信号(Signal) 触发 Agent Entity
- 轮询(Polling) 机制获取响应
- 支持 Fire-and-Forget 模式(不等响应)
2. 编排上下文模式 (Orchestration Mode)
# 在 Durable Functions 编排中使用
@app.orchestration_trigger(context_name="context")
def workflow(context: DurableOrchestrationContext):
executor = OrchestrationAgentExecutor(context)
agent = DurableAIAgent(executor, "MyAgent")
# 使用 yield 等待完成
response = yield agent.run("Hello") # 返回 DurableAgentTask
特点:
- 使用
OrchestrationContext进行确定性执行 - 通过 call_entity 调用 Agent Entity
- 返回
Task对象供yield使用 - 支持编排重放(replay)语义
状态管理
状态架构层级
会话 ID 管理
@dataclass
class AgentSessionId:
name: str
key: str
ENTITY_NAME_PREFIX: str = "dafx-"
@property
def entity_name(self) -> str:
return f"{self.ENTITY_NAME_PREFIX}{self.name}"
def __str__(self) -> str:
return f"@{self.name}@{self.key}"
格式:@AgentName@uuid-key
用途:
- 唯一标识 Durable Entity 实例
- 从线程恢复状态
- 通过 HTTP Header
x-ms-thread-id传递
状态持久化策略
关键特性:
- 每个请求/响应对都生成
correlation_id用于匹配 - 支持
try_get_agent_response(correlation_id)快速查询 - 错误状态也被持久化,支持错误恢复
执行器机制
执行流程对比
客户端执行器
编排执行器
DurableAgentTask 转换机制
class DurableAgentTask(CompositeTask[AgentResponse], CompletableTask[AgentResponse]):
"""包装实体调用任务,将原始结果转换为 AgentResponse"""
def on_child_completed(self, task: Task[Any]) -> None:
if task.is_failed:
self.fail("call_entity Task failed", task.get_exception())
return
raw_result = task.get_result()
response = load_agent_response(raw_result)
if self._response_format:
ensure_response_format(self._response_format, self._correlation_id, response)
self.complete(response)
关键职责:
- 拦截底层实体任务完成
- 将原始结果反序列化为
AgentResponse - 验证响应格式(如果指定了 Pydantic 模型)
- 处理失败情况
实体管理
AgentEntity 核心逻辑
实体执行流程
class AgentEntity:
async def run(self, request: RunRequest) -> AgentResponse:
# 1. 将请求记录到状态历史
state_request = DurableAgentStateRequest.from_run_request(run_request)
self.state.data.conversation_history.append(state_request)
# 2. 构建消息历史(排除错误响应)
chat_messages = [
m.to_chat_message()
for entry in self.state.data.conversation_history
if not self._is_error_response(entry)
for m in entry.messages
]
try:
# 3. 执行 Agent(优先使用流式)
agent_run_response = await self._invoke_agent(...)
# 4. 记录响应到状态
state_response = DurableAgentStateResponse.from_run_response(...)
self.state.data.conversation_history.append(state_response)
self.persist_state()
return agent_run_response
except Exception as exc:
# 5. 错误处理 - 记录错误状态
error_response = create_error_response(exc)
error_state_response = DurableAgentStateResponse.from_run_response(...)
error_state_response.is_error = True
self.state.data.conversation_history.append(error_state_response)
self.persist_state()
return error_response
集成点
1. Durable Task Framework 集成
durabletask-python
├── client/TaskHubGrpcClient
│ ├── signal_entity() # 客户端信号触发
│ ├── call_entity() # 编排调用(通过context)
│ └── get_entity() # 获取状态
├── task/OrchestrationContext
│ ├── signal_entity() # 编排内信号
│ ├── call_entity() # 编排内调用
│ └── new_uuid() # 确定性UUID
└── entities/DurableEntity
├── get_state() # 读取状态
└── set_state() # 写入状态
2. Azure Functions 集成(azurefunctions 包)
Azure Functions 特定功能:
agent_entity_trigger:Entity 触发器装饰器- 内置 HTTP 端点自动生成
- 与 Durable Task Scheduler 集成
深挖价值点
设计亮点
双模式抽象:通过
DurableAgentExecutor抽象层,统一了客户端和编排上下文的使用方式协程 vs Task 转换:巧妙处理了 Python
async/await和 Durable Functionsyield模式的差异状态 Schema 版本化:
SCHEMA_VERSION = "1.1.0"支持未来状态迁移Correlation ID 追踪:每个请求/响应对都有唯一ID,支持精确匹配和日志追踪
错误状态持久化:错误不仅返回,还记录到历史中,支持故障排查和重试策略
内容类型完备性:支持 Text、FunctionCall、FunctionResult、Error、Data、URI 等10+内容类型
流式回调机制:
AgentResponseCallbackProtocol允许流式响应实时通知
是否值得深挖状态持久化机制?
结论:值得深挖,理由如下:
生产关键路径:状态持久化是 Durable Agent 的核心价值,理解其机制对排查生产问题至关重要
性能瓶颈点:状态序列化/反序列化、存储I/O是主要性能瓶颈,需要优化策略
一致性保障:理解
correlation_id匹配机制和版本化 Schema,有助于设计可靠的重试和故障恢复扩展基础:如果要扩展支持自定义存储后端(如 Redis、CosmosDB),必须理解当前状态抽象
成本优化:状态大小直接影响存储成本,理解字段结构有助于设计存储优化策略
推荐阅读顺序
_durable_agent_state.py- 状态模型定义(最核心)_entities.py- Agent 实体执行逻辑_executors.py- 客户端/编排执行器_shim.py- DurableAIAgent 代理层_models.py- 请求/线程模型