Events (Bus)
OpenCode 使用事件总线系统进行组件间通信。
概述
Bus 实现了发布-订阅模式,允许组件解耦通信。支持全局事件总线和实例级别事件总线。
定义位置
packages/opencode/src/bus/index.ts
事件类型
核心事件
| 事件名 | 负载结构 | 说明 |
|---|---|---|
session.created | {info: Session} | 会话创建 |
session.updated | {info: Session} | 会话更新 |
session.deleted | {info: Session} | 会话删除 |
session.diff | {sessionID, diff: FileDiff[]} | 会话差异更新 |
session.error | {sessionID?, error} | 会话错误 |
session.status | {sessionID, status: Info} | 会话状态更新 |
message.updated | {info: Message} | 消息更新 |
message.removed | {sessionID, messageID} | 消息删除 |
message.part.updated | {part, delta?} | Part 更新 |
message.part.removed | {sessionID, messageID, partID} | Part 删除 |
project.updated | info: Project} | 项目更新 |
权限事件
| 事件名 | 负载结构 | 说明 |
|---|---|---|
permission.asked | request: Request | 询问权限 |
permission.replied | {sessionID, requestID, reply} | 收到权限回复 |
MCP 事件
| 事件名 | 负载结构 | 说明 |
|---|---|---|
mcp.tools.changed | {server: string} | MCP 工具列表更新 |
mcp.browser.open.failed | {mcpName, url: string} | 浏览器打开失败 |
全局事件
| 事件名 | 负载结构 | 说明 |
|---|---|---|
global.disposed | {} | 全局资源清理 |
API
发布事件
Bus.publish(Event, payload)
发布单个事件到事件总线。
示例:
Bus.publish(Session.Event.Created, {
info: session,
})
订阅所有事件
Bus.subscribeAll(handler)
订阅所有事件。
示例:
Bus.subscribeAll(async (event) => {
console.log(`Event: ${event.type}`)
console.log(`Payload: ${JSON.stringify(event.properties)}`)
})
取消订阅
const unsubscribe = Bus.subscribeAll(handler)
// 取消订阅
unsubscribe()
事件处理流程
事件示例
监听会话创建
Bus.subscribeAll(async (event) => {
if (event.type === Session.Event.Created.type) {
const { info } = event.properties
console.log(`New session created: ${info.id}`)
console.log(`Title: ${info.title}`)
}
})
监听消息更新
Bus.subscribeAll(async (event) => {
if (event.type === MessageV2.Event.Updated.type) {
const { info } = event.properties
if (info.role === "assistant") {
console.log(`Assistant message completed`)
console.log(`Cost: $${info.cost}`)
}
}
})
监听 Part 更新
Bus.subscribeAll(async (event) => {
if (event.type === MessageV2.Event.PartUpdated.type) {
const { part, delta } = event.properties
if (part.type === "text" && delta) {
// 流式文本更新
console.log(`Received: ${delta}`)
}
}
})
监听权限请求
Bus.subscribeAll(async (event) => {
if (event.type === PermissionNext.Event.Asked.type) {
const { info } = event.properties
console.log(`Permission request: ${info.permission}`)
console.log(`Patterns: ${info.patterns.join(", ")}`)
// 显示确认对话框
const approved = await showPermissionDialog(info)
if (approved) {
await PermissionNext.reply({
requestID: info.id,
reply: "once",
})
} else {
await PermissionNext.reply({
requestID: info.id,
reply: "reject",
})
}
}
})
监听 MCP 工具变更
Bus.subscribeAll(async (event) => {
if (event.type === MCP.ToolsChanged.type) {
const { server } = event.properties
console.log(`MCP server ${server} tools changed`)
// 重新加载工具
const tools = await MCP.tools()
}
})
事件过滤
按类型过滤
Bus.subscribeAll(async (event) => {
// 只处理会话相关事件
if (!event.type.startsWith("session.")) return
console.log(`Session event: ${event.type}`)
})
按会话过滤
const targetSessionID = session.id
Bus.subscribeAll(async (event) => {
// 检查事件是否属于目标会话
const props = event.properties as any
if (props.sessionID && props.sessionID !== targetSessionID) return
console.log(`Event for session: ${event.type}`)
})
事件优先级
事件按以下顺序处理:
- 同步事件先处理
- 异步事件并行处理
- 错误不中断其他订阅者
错误处理
订阅者错误
Bus.subscribeAll(async (event) => {
try {
// 处理事件
} catch (error) {
// 记录错误但不中断其他订阅者
console.error(`Event handler failed:`, error)
}
})
全局 vs 实例事件
全局事件总线
// 全局事件,用于跨实例通信
GlobalBus.emit("event", {
type: "project.updated",
properties: project,
})
实例事件总线
// 实例级别事件,仅当前实例可见
Bus.publish(Session.Event.Created, {
info: session,
})
事件序列化
所有事件都通过 BusEvent.define 定义:
export const Created = BusEvent.define(
"session.created",
z.object({
info: Session.Info,
}),
)
这确保事件结构的一致性和类型安全。
性能考虑
批量事件
// 避免发布过多小事件
const updates = []
for (const change of changes) {
updates.push(change)
if (updates.length >= 100) {
Bus.publish(BatchEvent, { updates })
updates.length = 0
}
}
// 发布剩余的
if (updates.length > 0) {
Bus.publish(BatchEvent, { updates })
}
异步处理
Bus.subscribeAll(async (event) => {
// 使用 setImmediate 避免阻塞
setImmediate(async () => {
await heavyProcessing(event)
})
})
调试
记录所有事件
Bus.subscribeAll(async (event) => {
console.log(`[${new Date().toISOString()}] ${event.type}`)
console.log(JSON.stringify(event.properties, null, 2))
})
事件图
最佳实践
- 类型安全: 使用 BusEvent.define 定义事件
- 错误处理: 总是处理订阅者中的错误
- 性能: 避免在事件处理中执行耗时操作
- 过滤: 在订阅者中过滤不关心的事件
- 清理: 使用取消订阅函数清理资源
变更历史
| 版本 | 变更内容 | 日期 |
|---|---|---|
| v1 | 初始 Bus 实现 | - |
| v1.1 | 添加全局事件总线 | - |
相关文档
- Session - 会话事件
- Message - 消息事件
- Permission - 权限事件