Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Claude Code MCP Client 架构深度解析

Claude Code MCP Client 架构深度解析

本文档深入分析 Claude Code 中 Model Context Protocol (MCP) 客户端的实现,聚焦于 src/services/mcp/client.ts 这个 3348 行的核心模块。


1. MCP 集成概述

Claude Code 通过 MCP 协议与外部服务集成,使 AI 模型能够调用外部工具和访问资源。MCP 客户端架构的核心目标是:

  • 多服务器管理:同时连接多个 MCP 服务器
  • 传输层抽象:支持 Stdio、SSE、StreamableHTTP、WebSocket 等多种传输方式
  • 工具代理:将 MCP 工具无缝转换为 Claude Code 原生工具
  • 认证与安全:处理 OAuth 认证、token 刷新、会话管理

2. 客户端架构总览

2.1 核心文件结构

src/services/mcp/
├── client.ts              # 主客户端实现 (3348 行)
├── config.ts              # 配置管理 (1578 行)
├── auth.ts                # 认证模块 (~1900 行)
├── types.ts               # 类型定义
├── MCPConnectionManager.tsx  # React 集成
└── ...

2.2 连接状态类型

// src/services/mcp/types.ts:180-200
export type ConnectedMCPServer = {
  client: Client              // MCP SDK 客户端实例
  name: string                // 服务器名称
  type: 'connected'
  capabilities: ServerCapabilities
  serverInfo?: { name: string; version: string }
  instructions?: string       // 服务器提供的指令
  config: ScopedMcpServerConfig
  cleanup: () => Promise<void>
}

export type FailedMCPServer = {
  name: string
  type: 'failed'
  config: ScopedMcpServerConfig
  error?: string
}

2.3 传输类型定义

// src/services/mcp/types.ts:23-26
export const TransportSchema = lazySchema(() =>
  z.enum(['stdio', 'sse', 'sse-ide', 'http', 'ws', 'sdk', 'claudeai-proxy']),
)

3. 传输层抽象

3.1 三种主要传输模式

MCP 客户端通过 @modelcontextprotocol/sdk 提供的传输接口实现多协议支持:

传输类型用途关键类
Stdio本地进程StdioClientTransport
SSE远程 HTTP 长连接SSEClientTransport
StreamableHTTP现代 HTTPStreamableHTTPClientTransport
WebSocket双向实时通信自定义 WebSocketTransport

3.2 传输初始化流程

// src/services/mcp/client.ts:595-961
export const connectToServer = memoize(
  async (name: string, serverRef: ScopedMcpServerConfig, serverStats?): 
    Promise<MCPServerConnection> => {
    let transport

    if (serverRef.type === 'sse') {
      // SSE 传输 - 带认证
      const authProvider = new ClaudeAuthProvider(name, serverRef)
      const transportOptions: SSEClientTransportOptions = {
        authProvider,
        fetch: wrapFetchWithTimeout(wrapFetchWithStepUpDetection(...)),
        requestInit: { headers: { 'User-Agent': getMCPUserAgent(), ... } },
      }
      transport = new SSEClientTransport(new URL(serverRef.url), transportOptions)
    } 
    else if (serverRef.type === 'http') {
      // Streamable HTTP 传输
      const authProvider = new ClaudeAuthProvider(name, serverRef)
      const transportOptions: StreamableHTTPClientTransportOptions = {
        authProvider,
        fetch: wrapFetchWithTimeout(...),
        requestInit: { ...proxyOptions, headers: {...} },
      }
      transport = new StreamableHTTPClientTransport(new URL(serverRef.url), transportOptions)
    }
    else if (serverRef.type === 'ws') {
      // WebSocket 传输
      const wsClient = new globalThis.WebSocket(serverRef.url, {
        protocols: ['mcp'],
        headers: wsHeaders,
        proxy: getWebSocketProxyUrl(serverRef.url),
        tls: tlsOptions,
      })
      transport = new WebSocketTransport(wsClient)
    }
    else if (serverRef.type === 'stdio' || !serverRef.type) {
      // 标准 I/O 传输
      transport = new StdioClientTransport({
        command: finalCommand,
        args: finalArgs,
        env: { ...subprocessEnv(), ...serverRef.env },
        stderr: 'pipe',
      })
    }
    // ...
  }
)

3.3 Fetch 包装器

为解决 HTTP 请求超时问题,客户端使用多层 fetch 包装:

// src/services/mcp/client.ts:492-550
export function wrapFetchWithTimeout(baseFetch: FetchLike): FetchLike {
  return async (url: string | URL, init?: RequestInit) => {
    const method = (init?.method ?? 'GET').toUpperCase()
    
    // GET 请求排除超时(SSE 流需要长时间连接)
    if (method === 'GET') {
      return baseFetch(url, init)
    }

    // 确保 MCP Streamable HTTP 的 Accept header
    const headers = new Headers(init?.headers)
    if (!headers.has('accept')) {
      headers.set('accept', 'application/json, text/event-stream')
    }

    // 使用 setTimeout 而非 AbortSignal.timeout() 避免内存泄漏
    const controller = new AbortController()
    const timer = setTimeout(
      () => controller.abort(new DOMException('The operation timed out.', 'TimeoutError')),
      MCP_REQUEST_TIMEOUT_MS  // 60 秒
    )
    timer.unref?.()  // 允许进程在超时时退出

    try {
      return await baseFetch(url, { ...init, headers, signal: controller.signal })
    } finally {
      clearTimeout(timer)
    }
  }
}

4. 工具与资源发现

4.1 工具获取

// src/services/mcp/client.ts:1743-1998
export const fetchToolsForClient = memoizeWithLRU(
  async (client: MCPServerConnection): Promise<Tool[]> => {
    if (client.type !== 'connected') return []
    if (!client.capabilities?.tools) return []

    const result = await client.client.request(
      { method: 'tools/list' },
      ListToolsResultSchema,
    )

    return result.tools.map((tool): Tool => ({
      ...MCPTool,  // 基础 MCP 工具模板
      name: buildMcpToolName(client.name, tool.name),  // mcp__server__tool
      mcpInfo: { serverName: client.name, toolName: tool.name },
      isMcp: true,
      inputJSONSchema: tool.inputSchema,
      
      async call(args, context, _canUseTool, parentMessage, onProgress) {
        // 调用 ensureConnectedClient 确保连接有效
        const connectedClient = await ensureConnectedClient(client)
        
        // 调用 MCP 工具
        const mcpResult = await callMCPToolWithUrlElicitationRetry({
          client: connectedClient,
          tool: tool.name,
          args,
          // ...
        })

        return { data: mcpResult.content }
      }
    }))
  },
  (client) => client.name,
  MCP_FETCH_CACHE_SIZE,  // 20
)

4.2 资源获取

// src/services/mcp/client.ts:2000-2031
export const fetchResourcesForClient = memoizeWithLRU(
  async (client: MCPServerConnection): Promise<ServerResource[]> => {
    if (client.type !== 'connected') return []
    if (!client.capabilities?.resources) return []

    const result = await client.client.request(
      { method: 'resources/list' },
      ListResourcesResultSchema,
    )

    return result.resources.map(resource => ({
      ...resource,
      server: client.name,
    }))
  }
)

4.3 命令(Prompts)获取

// src/services/mcp/client.ts:2033-2107
export const fetchCommandsForClient = memoizeWithLRU(
  async (client: MCPServerConnection): Promise<Command[]> => {
    if (client.type !== 'connected') return []
    if (!client.capabilities?.prompts) return []

    const result = await client.client.request(
      { method: 'prompts/list' },
      ListPromptsResultSchema,
    )

    return result.prompts.map(prompt => ({
      type: 'prompt' as const,
      name: 'mcp__' + normalizeNameForMCP(client.name) + '__' + prompt.name,
      isMcp: true,
      async getPromptForCommand(args) {
        const connectedClient = await ensureConnectedClient(client)
        const result = await connectedClient.client.getPrompt({
          name: prompt.name,
          arguments: args,
        })
        // 转换结果内容...
      }
    }))
  }
)

5. 连接生命周期管理

5.1 连接建立

// src/services/mcp/client.ts:1048-1155
const connectPromise = client.connect(transport)
const timeoutPromise = new Promise<_, reject>((_, reject) => {
  setTimeout(() => {
    reject(new Error(`MCP server "${name}" connection timed out`))
  }, getConnectionTimeoutMs())  // 默认 30 秒
})

await Promise.race([connectPromise, timeoutPromise])

5.2 错误处理与重连

// src/services/mcp/client.ts:1266-1371
client.onerror = (error: Error) => {
  // 检测会话过期(HTTP 404 + JSON-RPC -32001)
  if (isMcpSessionExpiredError(error)) {
    closeTransportAndRejectPending('session expired')
    return
  }

  // 远程传输的终端错误计数
  if (isTerminalConnectionError(error.message)) {
    consecutiveConnectionErrors++
    if (consecutiveConnectionErrors >= MAX_ERRORS_BEFORE_RECONNECT) {
      closeTransportAndRejectPending('max consecutive terminal errors')
    }
  }
}

client.onclose = () => {
  // 清除缓存以触发重连
  const key = getServerCacheKey(name, serverRef)
  fetchToolsForClient.cache.delete(name)
  fetchResourcesForClient.cache.delete(name)
  fetchCommandsForClient.cache.delete(name)
  connectToServer.cache.delete(key)
}

5.3 清理机制

// src/services/mcp/client.ts:1404-1570
const cleanup = async () => {
  // 对于 stdio 传输,逐步发送信号终止进程
  if (serverRef.type === 'stdio') {
    const childPid = stdioTransport.pid
    process.kill(childPid, 'SIGINT')
    await sleep(100)
    // 如果进程仍存在,发送 SIGTERM
    process.kill(childPid, 'SIGTERM')
    await sleep(400)
    // 最后强制 kill
    process.kill(childPid, 'SIGKILL')
  }
  await client.close()
}

6. 配置层级体系

6.1 配置作用域

// src/services/mcp/types.ts:10-21
export const ConfigScopeSchema = lazySchema(() =>
  z.enum(['local', 'user', 'project', 'dynamic', 'enterprise', 'claudeai', 'managed']),
)

6.2 配置优先级

enterprise (企业级) > local (本地) > project (项目) > user (用户) > plugin (插件)

6.3 配置获取

// src/services/mcp/config.ts:1071-1251
export async function getClaudeCodeMcpConfigs(
  dynamicServers = {},
  extraDedupTargets = Promise.resolve({}),
): Promise<{ servers: Record<string, ScopedMcpServerConfig>; errors: PluginError[] }> {
  // 1. 如果存在企业级配置,优先使用
  if (doesEnterpriseMcpConfigExist()) {
    return { servers: filterMcpServersByPolicy(enterpriseServers), errors: [] }
  }

  // 2. 加载各层级配置
  const { servers: userServers } = getMcpConfigsByScope('user')
  const { servers: projectServers } = getMcpConfigsByScope('project')
  const { servers: localServers } = getMcpConfigsByScope('local')

  // 3. 加载插件 MCP 服务器
  const pluginMcpServers = await loadPluginMcpServers()

  // 4. 去重处理
  const { servers: dedupedPluginServers, suppressed } = dedupPluginMcpServers(
    enabledPluginServers,
    enabledManualServers,
  )

  // 5. 合并(按优先级)
  const configs = Object.assign(
    {},
    dedupedPluginServers,  // 最低优先级
    userServers,
    projectServers,
    localServers,          // 最高优先级
  )

  // 6. 应用策略过滤
  return { servers: filterMcpServersByPolicy(configs), errors: mcpErrors }
}

6.4 策略过滤

// src/services/mcp/config.ts:417-508
function isMcpServerAllowedByPolicy(serverName: string, config?: McpServerConfig): boolean {
  // 1. 检查黑名单(绝对优先)
  if (isMcpServerDenied(serverName, config)) return false

  // 2. 检查白名单
  const settings = getMcpAllowlistSettings()
  if (!settings.allowedMcpServers) return true
  if (settings.allowedMcpServers.length === 0) return false

  // 3. 匹配规则(名称/命令/URL)
  // ...
}

7. 认证流程

7.1 ClaudeAuthProvider

// src/services/mcp/auth.ts:1376-1426
export class ClaudeAuthProvider implements OAuthClientProvider {
  private serverName: string
  private serverConfig: McpSSEServerConfig | McpHTTPServerConfig
  private redirectUri: string
  private _codeVerifier?: string
  private _authorizationUrl?: string
  private _state?: string
  private _metadata?: AuthorizationServerMetadata

  async tokens(): Promise<OAuthTokens | undefined> {
    // 从 keychain 读取存储的 token
    const storage = getSecureStorage()
    const serverKey = getServerKey(this.serverName, this.serverConfig)
    const entry = storage.read()?.mcpOAuth?.[serverKey]
    
    if (!entry?.accessToken) return undefined
    
    // 检查是否过期,必要时刷新
    if (entry.expiresAt < Date.now()) {
      return await this.refresh(entry.refreshToken)
    }
    
    return { access_token: entry.accessToken, ... }
  }
}

7.2 OAuth 流程

// src/services/mcp/auth.ts:847-1342
export async function performMCPOAuthFlow(
  serverName: string,
  serverConfig: McpSSEServerConfig | McpHTTPServerConfig,
  onAuthorizationUrl: (url: string) => void,
  abortSignal?: AbortSignal,
): Promise<void> {
  // 1. 创建本地回调服务器
  const port = await findAvailablePort()
  const redirectUri = buildRedirectUri(port)
  
  // 2. 启动 OAuth 授权流程
  const provider = new ClaudeAuthProvider(serverName, serverConfig, redirectUri)
  const result = await sdkAuth(provider, { serverUrl: serverConfig.url })
  
  // 3. 处理回调
  if (result === 'AUTHORIZED') {
    // Token 已保存到 keychain
  }
}

7.3 XAA (Cross-App Access) 认证

// src/services/mcp/auth.ts:664-845
async function performMCPXaaAuth(
  serverName: string,
  serverConfig: McpSSEServerConfig | McpHTTPServerConfig,
): Promise<void> {
  // 1. 从 IdP 获取 id_token(一次浏览器登录)
  const idToken = await acquireIdpIdToken({ idpIssuer, ... })
  
  // 2. 执行 RFC 8693 + RFC 7523 交换
  const tokens = await performCrossAppAccess(serverConfig.url, {
    clientId, clientSecret, idpIdToken: idToken, ...
  })
  
  // 3. 保存到与标准 OAuth 相同的 keychain 槽位
  storage.update({ mcpOAuth: { [serverKey]: { ...tokens } } })
}

8. 错误处理与弹性

8.1 认证缓存

// src/services/mcp/client.ts:280-316
const MCP_AUTH_CACHE_TTL_MS = 15 * 60 * 1000  // 15 分钟

async function isMcpAuthCached(serverId: string): Promise<boolean> {
  const cache = await getMcpAuthCache()
  const entry = cache[serverId]
  if (!entry) return false
  return Date.now() - entry.timestamp < MCP_AUTH_CACHE_TTL_MS
}

function setMcpAuthCacheEntry(serverId: string): void {
  // 序列化写入以防止竞态条件
  writeChain = writeChain.then(async () => {
    const cache = await getMcpAuthCache()
    cache[serverId] = { timestamp: Date.now() }
    await writeFile(getMcpAuthCachePath(), jsonStringify(cache))
    authCachePromise = null  // 使缓存失效
  })
}

8.2 会话过期处理

// src/services/mcp/client.ts:193-206
export function isMcpSessionExpiredError(error: Error): boolean {
  const httpStatus = 'code' in error ? (error as Error & { code?: number }).code : undefined
  if (httpStatus !== 404) return false
  
  // MCP 服务器返回: {"error":{"code":-32001,"message":"Session not found"}}
  return error.message.includes('"code":-32001')
}

8.3 工具调用重试

// src/services/mcp/client.ts:1859-1922
const MAX_SESSION_RETRIES = 1
for (let attempt = 0; ; attempt++) {
  try {
    const mcpResult = await callMCPToolWithUrlElicitationRetry({...})
    return { data: mcpResult.content }
  } catch (error) {
    // 会话过期时重试一次
    if (error instanceof McpSessionExpiredError && attempt < MAX_SESSION_RETRIES) {
      logMCPDebug(client.name, `Retrying tool after session recovery`)
      continue
    }
    throw error
  }
}

9. React 集成

9.1 MCPConnectionManager

// src/services/mcp/MCPConnectionManager.tsx:1-73
export function MCPConnectionManager({ 
  children, 
  dynamicMcpConfig, 
  isStrictMcpConfig 
}) {
  const { reconnectMcpServer, toggleMcpServer } = useManageMCPConnections(
    dynamicMcpConfig, 
    isStrictMcpConfig
  )

  return (
    <MCPConnectionContext.Provider value={{ reconnectMcpServer, toggleMcpServer }}>
      {children}
    </MCPConnectionContext.Provider>
  )
}

9.2 上下文使用

// src/services/mcp/MCPConnectionManager.tsx:17-30
export function useMcpReconnect() {
  const context = useContext(MCPConnectionContext)
  if (!context) {
    throw new Error("useMcpReconnect must be used within MCPConnectionManager")
  }
  return context.reconnectMcpServer
}

10. 关键代码路径

10.1 MCP 工具执行流程

用户请求 → Tool.call() 
  → fetchToolsForClient() 获取工具定义
  → ensureConnectedClient() 确保连接有效
  → callMCPToolWithUrlElicitationRetry()
    → callMCPTool()
      → client.callTool() [MCP SDK]
      → processMCPResult() [处理结果]
    → 处理 URL elicitation 重试
  → 返回结果

10.2 服务器连接流程

getMcpToolsCommandsAndResources()
  → processBatched(localServers, 3)  // Stdio 低并发
  → processBatched(remoteServers, 20) // 远程高并发
    → connectToServer()
      → 创建传输层
      → 创建 MCP Client
      → client.connect(transport)
      → fetchToolsForClient()
      → fetchResourcesForClient()
      → fetchCommandsForClient()
    → onConnectionAttempt() 回调

11. MCP 工具执行流程序列图


12. 总结

Claude Code 的 MCP 客户端实现了一个功能完备的协议集成层:

  1. 传输抽象:通过 MCP SDK 的传输接口,支持stdio、SSE、StreamableHTTP、WebSocket等多种连接方式
  2. 工具代理:将 MCP 工具转换为原生工具格式,实现无缝集成
  3. 配置管理:支持项目级、用户级、企业级配置,并提供策略过滤
  4. 认证体系:完整的 OAuth 2.0 流程,支持标准授权和 XAA 跨应用访问
  5. 弹性设计:通过缓存、错误重试、会话管理保证连接可靠性
  6. React 集成:通过 Context 提供连接管理能力

核心代码位于 src/services/mcp/client.ts(3348行),配合 config.ts、auth.ts 等模块共同实现完整的 MCP 集成功能。