Claude Code MCP Client 架构深度解析
本文档深入分析 Claude Code 中 Model Context Protocol (MCP) 客户端的实现,聚焦于 src/services/mcp/client.ts 这个 3348 行的核心模块。
1. MCP 集成概述
Claude Code 通过 MCP 协议与外部服务集成,使 AI 模型能够调用外部工具和访问资源。MCP 客户端架构的核心目标是:
- 多服务器管理:同时连接多个 MCP 服务器
- 传输层抽象:支持 Stdio、SSE、StreamableHTTP、WebSocket 等多种传输方式
- 工具代理:将 MCP 工具无缝转换为 Claude Code 原生工具
- 认证与安全:处理 OAuth 认证、token 刷新、会话管理
2. 客户端架构总览
2.1 核心文件结构
src/services/mcp/
├── client.ts # 主客户端实现 (3348 行)
├── config.ts # 配置管理 (1578 行)
├── auth.ts # 认证模块 (~1900 行)
├── types.ts # 类型定义
├── MCPConnectionManager.tsx # React 集成
└── ...
2.2 连接状态类型
// src/services/mcp/types.ts:180-200
export type ConnectedMCPServer = {
client: Client // MCP SDK 客户端实例
name: string // 服务器名称
type: 'connected'
capabilities: ServerCapabilities
serverInfo?: { name: string; version: string }
instructions?: string // 服务器提供的指令
config: ScopedMcpServerConfig
cleanup: () => Promise<void>
}
export type FailedMCPServer = {
name: string
type: 'failed'
config: ScopedMcpServerConfig
error?: string
}
2.3 传输类型定义
// src/services/mcp/types.ts:23-26
export const TransportSchema = lazySchema(() =>
z.enum(['stdio', 'sse', 'sse-ide', 'http', 'ws', 'sdk', 'claudeai-proxy']),
)
3. 传输层抽象
3.1 三种主要传输模式
MCP 客户端通过 @modelcontextprotocol/sdk 提供的传输接口实现多协议支持:
| 传输类型 | 用途 | 关键类 |
|---|---|---|
| Stdio | 本地进程 | StdioClientTransport |
| SSE | 远程 HTTP 长连接 | SSEClientTransport |
| StreamableHTTP | 现代 HTTP | StreamableHTTPClientTransport |
| WebSocket | 双向实时通信 | 自定义 WebSocketTransport |
3.2 传输初始化流程
// src/services/mcp/client.ts:595-961
export const connectToServer = memoize(
async (name: string, serverRef: ScopedMcpServerConfig, serverStats?):
Promise<MCPServerConnection> => {
let transport
if (serverRef.type === 'sse') {
// SSE 传输 - 带认证
const authProvider = new ClaudeAuthProvider(name, serverRef)
const transportOptions: SSEClientTransportOptions = {
authProvider,
fetch: wrapFetchWithTimeout(wrapFetchWithStepUpDetection(...)),
requestInit: { headers: { 'User-Agent': getMCPUserAgent(), ... } },
}
transport = new SSEClientTransport(new URL(serverRef.url), transportOptions)
}
else if (serverRef.type === 'http') {
// Streamable HTTP 传输
const authProvider = new ClaudeAuthProvider(name, serverRef)
const transportOptions: StreamableHTTPClientTransportOptions = {
authProvider,
fetch: wrapFetchWithTimeout(...),
requestInit: { ...proxyOptions, headers: {...} },
}
transport = new StreamableHTTPClientTransport(new URL(serverRef.url), transportOptions)
}
else if (serverRef.type === 'ws') {
// WebSocket 传输
const wsClient = new globalThis.WebSocket(serverRef.url, {
protocols: ['mcp'],
headers: wsHeaders,
proxy: getWebSocketProxyUrl(serverRef.url),
tls: tlsOptions,
})
transport = new WebSocketTransport(wsClient)
}
else if (serverRef.type === 'stdio' || !serverRef.type) {
// 标准 I/O 传输
transport = new StdioClientTransport({
command: finalCommand,
args: finalArgs,
env: { ...subprocessEnv(), ...serverRef.env },
stderr: 'pipe',
})
}
// ...
}
)
3.3 Fetch 包装器
为解决 HTTP 请求超时问题,客户端使用多层 fetch 包装:
// src/services/mcp/client.ts:492-550
export function wrapFetchWithTimeout(baseFetch: FetchLike): FetchLike {
return async (url: string | URL, init?: RequestInit) => {
const method = (init?.method ?? 'GET').toUpperCase()
// GET 请求排除超时(SSE 流需要长时间连接)
if (method === 'GET') {
return baseFetch(url, init)
}
// 确保 MCP Streamable HTTP 的 Accept header
const headers = new Headers(init?.headers)
if (!headers.has('accept')) {
headers.set('accept', 'application/json, text/event-stream')
}
// 使用 setTimeout 而非 AbortSignal.timeout() 避免内存泄漏
const controller = new AbortController()
const timer = setTimeout(
() => controller.abort(new DOMException('The operation timed out.', 'TimeoutError')),
MCP_REQUEST_TIMEOUT_MS // 60 秒
)
timer.unref?.() // 允许进程在超时时退出
try {
return await baseFetch(url, { ...init, headers, signal: controller.signal })
} finally {
clearTimeout(timer)
}
}
}
4. 工具与资源发现
4.1 工具获取
// src/services/mcp/client.ts:1743-1998
export const fetchToolsForClient = memoizeWithLRU(
async (client: MCPServerConnection): Promise<Tool[]> => {
if (client.type !== 'connected') return []
if (!client.capabilities?.tools) return []
const result = await client.client.request(
{ method: 'tools/list' },
ListToolsResultSchema,
)
return result.tools.map((tool): Tool => ({
...MCPTool, // 基础 MCP 工具模板
name: buildMcpToolName(client.name, tool.name), // mcp__server__tool
mcpInfo: { serverName: client.name, toolName: tool.name },
isMcp: true,
inputJSONSchema: tool.inputSchema,
async call(args, context, _canUseTool, parentMessage, onProgress) {
// 调用 ensureConnectedClient 确保连接有效
const connectedClient = await ensureConnectedClient(client)
// 调用 MCP 工具
const mcpResult = await callMCPToolWithUrlElicitationRetry({
client: connectedClient,
tool: tool.name,
args,
// ...
})
return { data: mcpResult.content }
}
}))
},
(client) => client.name,
MCP_FETCH_CACHE_SIZE, // 20
)
4.2 资源获取
// src/services/mcp/client.ts:2000-2031
export const fetchResourcesForClient = memoizeWithLRU(
async (client: MCPServerConnection): Promise<ServerResource[]> => {
if (client.type !== 'connected') return []
if (!client.capabilities?.resources) return []
const result = await client.client.request(
{ method: 'resources/list' },
ListResourcesResultSchema,
)
return result.resources.map(resource => ({
...resource,
server: client.name,
}))
}
)
4.3 命令(Prompts)获取
// src/services/mcp/client.ts:2033-2107
export const fetchCommandsForClient = memoizeWithLRU(
async (client: MCPServerConnection): Promise<Command[]> => {
if (client.type !== 'connected') return []
if (!client.capabilities?.prompts) return []
const result = await client.client.request(
{ method: 'prompts/list' },
ListPromptsResultSchema,
)
return result.prompts.map(prompt => ({
type: 'prompt' as const,
name: 'mcp__' + normalizeNameForMCP(client.name) + '__' + prompt.name,
isMcp: true,
async getPromptForCommand(args) {
const connectedClient = await ensureConnectedClient(client)
const result = await connectedClient.client.getPrompt({
name: prompt.name,
arguments: args,
})
// 转换结果内容...
}
}))
}
)
5. 连接生命周期管理
5.1 连接建立
// src/services/mcp/client.ts:1048-1155
const connectPromise = client.connect(transport)
const timeoutPromise = new Promise<_, reject>((_, reject) => {
setTimeout(() => {
reject(new Error(`MCP server "${name}" connection timed out`))
}, getConnectionTimeoutMs()) // 默认 30 秒
})
await Promise.race([connectPromise, timeoutPromise])
5.2 错误处理与重连
// src/services/mcp/client.ts:1266-1371
client.onerror = (error: Error) => {
// 检测会话过期(HTTP 404 + JSON-RPC -32001)
if (isMcpSessionExpiredError(error)) {
closeTransportAndRejectPending('session expired')
return
}
// 远程传输的终端错误计数
if (isTerminalConnectionError(error.message)) {
consecutiveConnectionErrors++
if (consecutiveConnectionErrors >= MAX_ERRORS_BEFORE_RECONNECT) {
closeTransportAndRejectPending('max consecutive terminal errors')
}
}
}
client.onclose = () => {
// 清除缓存以触发重连
const key = getServerCacheKey(name, serverRef)
fetchToolsForClient.cache.delete(name)
fetchResourcesForClient.cache.delete(name)
fetchCommandsForClient.cache.delete(name)
connectToServer.cache.delete(key)
}
5.3 清理机制
// src/services/mcp/client.ts:1404-1570
const cleanup = async () => {
// 对于 stdio 传输,逐步发送信号终止进程
if (serverRef.type === 'stdio') {
const childPid = stdioTransport.pid
process.kill(childPid, 'SIGINT')
await sleep(100)
// 如果进程仍存在,发送 SIGTERM
process.kill(childPid, 'SIGTERM')
await sleep(400)
// 最后强制 kill
process.kill(childPid, 'SIGKILL')
}
await client.close()
}
6. 配置层级体系
6.1 配置作用域
// src/services/mcp/types.ts:10-21
export const ConfigScopeSchema = lazySchema(() =>
z.enum(['local', 'user', 'project', 'dynamic', 'enterprise', 'claudeai', 'managed']),
)
6.2 配置优先级
enterprise (企业级) > local (本地) > project (项目) > user (用户) > plugin (插件)
6.3 配置获取
// src/services/mcp/config.ts:1071-1251
export async function getClaudeCodeMcpConfigs(
dynamicServers = {},
extraDedupTargets = Promise.resolve({}),
): Promise<{ servers: Record<string, ScopedMcpServerConfig>; errors: PluginError[] }> {
// 1. 如果存在企业级配置,优先使用
if (doesEnterpriseMcpConfigExist()) {
return { servers: filterMcpServersByPolicy(enterpriseServers), errors: [] }
}
// 2. 加载各层级配置
const { servers: userServers } = getMcpConfigsByScope('user')
const { servers: projectServers } = getMcpConfigsByScope('project')
const { servers: localServers } = getMcpConfigsByScope('local')
// 3. 加载插件 MCP 服务器
const pluginMcpServers = await loadPluginMcpServers()
// 4. 去重处理
const { servers: dedupedPluginServers, suppressed } = dedupPluginMcpServers(
enabledPluginServers,
enabledManualServers,
)
// 5. 合并(按优先级)
const configs = Object.assign(
{},
dedupedPluginServers, // 最低优先级
userServers,
projectServers,
localServers, // 最高优先级
)
// 6. 应用策略过滤
return { servers: filterMcpServersByPolicy(configs), errors: mcpErrors }
}
6.4 策略过滤
// src/services/mcp/config.ts:417-508
function isMcpServerAllowedByPolicy(serverName: string, config?: McpServerConfig): boolean {
// 1. 检查黑名单(绝对优先)
if (isMcpServerDenied(serverName, config)) return false
// 2. 检查白名单
const settings = getMcpAllowlistSettings()
if (!settings.allowedMcpServers) return true
if (settings.allowedMcpServers.length === 0) return false
// 3. 匹配规则(名称/命令/URL)
// ...
}
7. 认证流程
7.1 ClaudeAuthProvider
// src/services/mcp/auth.ts:1376-1426
export class ClaudeAuthProvider implements OAuthClientProvider {
private serverName: string
private serverConfig: McpSSEServerConfig | McpHTTPServerConfig
private redirectUri: string
private _codeVerifier?: string
private _authorizationUrl?: string
private _state?: string
private _metadata?: AuthorizationServerMetadata
async tokens(): Promise<OAuthTokens | undefined> {
// 从 keychain 读取存储的 token
const storage = getSecureStorage()
const serverKey = getServerKey(this.serverName, this.serverConfig)
const entry = storage.read()?.mcpOAuth?.[serverKey]
if (!entry?.accessToken) return undefined
// 检查是否过期,必要时刷新
if (entry.expiresAt < Date.now()) {
return await this.refresh(entry.refreshToken)
}
return { access_token: entry.accessToken, ... }
}
}
7.2 OAuth 流程
// src/services/mcp/auth.ts:847-1342
export async function performMCPOAuthFlow(
serverName: string,
serverConfig: McpSSEServerConfig | McpHTTPServerConfig,
onAuthorizationUrl: (url: string) => void,
abortSignal?: AbortSignal,
): Promise<void> {
// 1. 创建本地回调服务器
const port = await findAvailablePort()
const redirectUri = buildRedirectUri(port)
// 2. 启动 OAuth 授权流程
const provider = new ClaudeAuthProvider(serverName, serverConfig, redirectUri)
const result = await sdkAuth(provider, { serverUrl: serverConfig.url })
// 3. 处理回调
if (result === 'AUTHORIZED') {
// Token 已保存到 keychain
}
}
7.3 XAA (Cross-App Access) 认证
// src/services/mcp/auth.ts:664-845
async function performMCPXaaAuth(
serverName: string,
serverConfig: McpSSEServerConfig | McpHTTPServerConfig,
): Promise<void> {
// 1. 从 IdP 获取 id_token(一次浏览器登录)
const idToken = await acquireIdpIdToken({ idpIssuer, ... })
// 2. 执行 RFC 8693 + RFC 7523 交换
const tokens = await performCrossAppAccess(serverConfig.url, {
clientId, clientSecret, idpIdToken: idToken, ...
})
// 3. 保存到与标准 OAuth 相同的 keychain 槽位
storage.update({ mcpOAuth: { [serverKey]: { ...tokens } } })
}
8. 错误处理与弹性
8.1 认证缓存
// src/services/mcp/client.ts:280-316
const MCP_AUTH_CACHE_TTL_MS = 15 * 60 * 1000 // 15 分钟
async function isMcpAuthCached(serverId: string): Promise<boolean> {
const cache = await getMcpAuthCache()
const entry = cache[serverId]
if (!entry) return false
return Date.now() - entry.timestamp < MCP_AUTH_CACHE_TTL_MS
}
function setMcpAuthCacheEntry(serverId: string): void {
// 序列化写入以防止竞态条件
writeChain = writeChain.then(async () => {
const cache = await getMcpAuthCache()
cache[serverId] = { timestamp: Date.now() }
await writeFile(getMcpAuthCachePath(), jsonStringify(cache))
authCachePromise = null // 使缓存失效
})
}
8.2 会话过期处理
// src/services/mcp/client.ts:193-206
export function isMcpSessionExpiredError(error: Error): boolean {
const httpStatus = 'code' in error ? (error as Error & { code?: number }).code : undefined
if (httpStatus !== 404) return false
// MCP 服务器返回: {"error":{"code":-32001,"message":"Session not found"}}
return error.message.includes('"code":-32001')
}
8.3 工具调用重试
// src/services/mcp/client.ts:1859-1922
const MAX_SESSION_RETRIES = 1
for (let attempt = 0; ; attempt++) {
try {
const mcpResult = await callMCPToolWithUrlElicitationRetry({...})
return { data: mcpResult.content }
} catch (error) {
// 会话过期时重试一次
if (error instanceof McpSessionExpiredError && attempt < MAX_SESSION_RETRIES) {
logMCPDebug(client.name, `Retrying tool after session recovery`)
continue
}
throw error
}
}
9. React 集成
9.1 MCPConnectionManager
// src/services/mcp/MCPConnectionManager.tsx:1-73
export function MCPConnectionManager({
children,
dynamicMcpConfig,
isStrictMcpConfig
}) {
const { reconnectMcpServer, toggleMcpServer } = useManageMCPConnections(
dynamicMcpConfig,
isStrictMcpConfig
)
return (
<MCPConnectionContext.Provider value={{ reconnectMcpServer, toggleMcpServer }}>
{children}
</MCPConnectionContext.Provider>
)
}
9.2 上下文使用
// src/services/mcp/MCPConnectionManager.tsx:17-30
export function useMcpReconnect() {
const context = useContext(MCPConnectionContext)
if (!context) {
throw new Error("useMcpReconnect must be used within MCPConnectionManager")
}
return context.reconnectMcpServer
}
10. 关键代码路径
10.1 MCP 工具执行流程
用户请求 → Tool.call()
→ fetchToolsForClient() 获取工具定义
→ ensureConnectedClient() 确保连接有效
→ callMCPToolWithUrlElicitationRetry()
→ callMCPTool()
→ client.callTool() [MCP SDK]
→ processMCPResult() [处理结果]
→ 处理 URL elicitation 重试
→ 返回结果
10.2 服务器连接流程
getMcpToolsCommandsAndResources()
→ processBatched(localServers, 3) // Stdio 低并发
→ processBatched(remoteServers, 20) // 远程高并发
→ connectToServer()
→ 创建传输层
→ 创建 MCP Client
→ client.connect(transport)
→ fetchToolsForClient()
→ fetchResourcesForClient()
→ fetchCommandsForClient()
→ onConnectionAttempt() 回调
11. MCP 工具执行流程序列图
12. 总结
Claude Code 的 MCP 客户端实现了一个功能完备的协议集成层:
- 传输抽象:通过 MCP SDK 的传输接口,支持stdio、SSE、StreamableHTTP、WebSocket等多种连接方式
- 工具代理:将 MCP 工具转换为原生工具格式,实现无缝集成
- 配置管理:支持项目级、用户级、企业级配置,并提供策略过滤
- 认证体系:完整的 OAuth 2.0 流程,支持标准授权和 XAA 跨应用访问
- 弹性设计:通过缓存、错误重试、会话管理保证连接可靠性
- React 集成:通过 Context 提供连接管理能力
核心代码位于 src/services/mcp/client.ts(3348行),配合 config.ts、auth.ts 等模块共同实现完整的 MCP 集成功能。