Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Events (Bus)

Events (Bus)

OpenCode 使用事件总线系统进行组件间通信。

概述

Bus 实现了发布-订阅模式,允许组件解耦通信。支持全局事件总线和实例级别事件总线。

定义位置

packages/opencode/src/bus/index.ts

事件类型

核心事件

事件名负载结构说明
session.created{info: Session}会话创建
session.updated{info: Session}会话更新
session.deleted{info: Session}会话删除
session.diff{sessionID, diff: FileDiff[]}会话差异更新
session.error{sessionID?, error}会话错误
session.status{sessionID, status: Info}会话状态更新
message.updated{info: Message}消息更新
message.removed{sessionID, messageID}消息删除
message.part.updated{part, delta?}Part 更新
message.part.removed{sessionID, messageID, partID}Part 删除
project.updatedinfo: Project}项目更新

权限事件

事件名负载结构说明
permission.askedrequest: Request询问权限
permission.replied{sessionID, requestID, reply}收到权限回复

MCP 事件

事件名负载结构说明
mcp.tools.changed{server: string}MCP 工具列表更新
mcp.browser.open.failed{mcpName, url: string}浏览器打开失败

全局事件

事件名负载结构说明
global.disposed{}全局资源清理

API

发布事件

Bus.publish(Event, payload)

发布单个事件到事件总线。

示例:

Bus.publish(Session.Event.Created, {
  info: session,
})

订阅所有事件

Bus.subscribeAll(handler)

订阅所有事件。

示例:

Bus.subscribeAll(async (event) => {
  console.log(`Event: ${event.type}`)
  console.log(`Payload: ${JSON.stringify(event.properties)}`)
})

取消订阅

const unsubscribe = Bus.subscribeAll(handler)

// 取消订阅
unsubscribe()

事件处理流程

事件示例

监听会话创建

Bus.subscribeAll(async (event) => {
  if (event.type === Session.Event.Created.type) {
    const { info } = event.properties

    console.log(`New session created: ${info.id}`)
    console.log(`Title: ${info.title}`)
  }
})

监听消息更新

Bus.subscribeAll(async (event) => {
  if (event.type === MessageV2.Event.Updated.type) {
    const { info } = event.properties

    if (info.role === "assistant") {
      console.log(`Assistant message completed`)
      console.log(`Cost: $${info.cost}`)
    }
  }
})

监听 Part 更新

Bus.subscribeAll(async (event) => {
  if (event.type === MessageV2.Event.PartUpdated.type) {
    const { part, delta } = event.properties

    if (part.type === "text" && delta) {
      // 流式文本更新
      console.log(`Received: ${delta}`)
    }
  }
})

监听权限请求

Bus.subscribeAll(async (event) => {
  if (event.type === PermissionNext.Event.Asked.type) {
    const { info } = event.properties

    console.log(`Permission request: ${info.permission}`)
    console.log(`Patterns: ${info.patterns.join(", ")}`)

    // 显示确认对话框
    const approved = await showPermissionDialog(info)

    if (approved) {
      await PermissionNext.reply({
        requestID: info.id,
        reply: "once",
      })
    } else {
      await PermissionNext.reply({
        requestID: info.id,
        reply: "reject",
      })
    }
  }
})

监听 MCP 工具变更

Bus.subscribeAll(async (event) => {
  if (event.type === MCP.ToolsChanged.type) {
    const { server } = event.properties

    console.log(`MCP server ${server} tools changed`)

    // 重新加载工具
    const tools = await MCP.tools()
  }
})

事件过滤

按类型过滤

Bus.subscribeAll(async (event) => {
  // 只处理会话相关事件
  if (!event.type.startsWith("session.")) return

  console.log(`Session event: ${event.type}`)
})

按会话过滤

const targetSessionID = session.id

Bus.subscribeAll(async (event) => {
  // 检查事件是否属于目标会话
  const props = event.properties as any
  if (props.sessionID && props.sessionID !== targetSessionID) return

  console.log(`Event for session: ${event.type}`)
})

事件优先级

事件按以下顺序处理:

  1. 同步事件先处理
  2. 异步事件并行处理
  3. 错误不中断其他订阅者

错误处理

订阅者错误

Bus.subscribeAll(async (event) => {
  try {
    // 处理事件
  } catch (error) {
    // 记录错误但不中断其他订阅者
    console.error(`Event handler failed:`, error)
  }
})

全局 vs 实例事件

全局事件总线

// 全局事件,用于跨实例通信
GlobalBus.emit("event", {
  type: "project.updated",
  properties: project,
})

实例事件总线

// 实例级别事件,仅当前实例可见
Bus.publish(Session.Event.Created, {
  info: session,
})

事件序列化

所有事件都通过 BusEvent.define 定义:

export const Created = BusEvent.define(
  "session.created",
  z.object({
    info: Session.Info,
  }),
)

这确保事件结构的一致性和类型安全。

性能考虑

批量事件

// 避免发布过多小事件
const updates = []

for (const change of changes) {
  updates.push(change)

  if (updates.length >= 100) {
    Bus.publish(BatchEvent, { updates })
    updates.length = 0
  }
}

// 发布剩余的
if (updates.length > 0) {
  Bus.publish(BatchEvent, { updates })
}

异步处理

Bus.subscribeAll(async (event) => {
  // 使用 setImmediate 避免阻塞
  setImmediate(async () => {
    await heavyProcessing(event)
  })
})

调试

记录所有事件

Bus.subscribeAll(async (event) => {
  console.log(`[${new Date().toISOString()}] ${event.type}`)
  console.log(JSON.stringify(event.properties, null, 2))
})

事件图

最佳实践

  1. 类型安全: 使用 BusEvent.define 定义事件
  2. 错误处理: 总是处理订阅者中的错误
  3. 性能: 避免在事件处理中执行耗时操作
  4. 过滤: 在订阅者中过滤不关心的事件
  5. 清理: 使用取消订阅函数清理资源

变更历史

版本变更内容日期
v1初始 Bus 实现-
v1.1添加全局事件总线-

相关文档

  • Session - 会话事件
  • Message - 消息事件
  • Permission - 权限事件