Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Wegent Dataflow and Communication Architecture

Wegent Dataflow and Communication Architecture

Project: Wegent - AI-native operating system for intelligent agent teams
Analysis Date: 2026-02-01
Document Version: 1.0


Executive Summary

Wegent implements a sophisticated real-time communication architecture built on Socket.IO with Redis adapter for horizontal scaling. The system supports multiple execution paths (Chat Shell for direct LLM, Executor Manager for Docker-based execution, and Local Device execution) with unified WebSocket event streaming. Key architectural decisions include:

  • Namespace-based isolation: /chat for user interactions, /local-executor for device connections
  • Room-based routing: User rooms (user:{id}) and Task rooms (task:{id}) for targeted message delivery
  • Dual-path execution: Direct chat streaming (Chat Shell) vs. Docker container execution (Executor)
  • Redis-backed scaling: Cross-worker communication via Redis Pub/Sub

1. WebSocket/Socket.IO Architecture

1.1 Namespace Organization

NamespacePurposeConnection TypeAuthentication
/chatUser chat interface, task managementBrowser clientsJWT token in auth payload
/local-executorLocal device/agent connectionsDesktop/CLI clientsJWT token in auth payload

1.2 Room-Based Routing System

Rooms provide targeted message delivery without broadcasting to all connected clients:

Room PatternPurposeMembers
user:{user_id}User-specific notificationsAll sessions of a user
task:{task_id}Task chat and updatesAll users viewing the task
device:{user_id}:{device_id}Device-specific commandsSpecific device connection

Room Join Flow:

1.3 Event Types

Client → Server Events

EventNamespaceHandlerPurpose
chat:send/chaton_chat_sendSend user message, trigger AI response
chat:cancel/chaton_chat_cancelCancel ongoing streaming
chat:retry/chaton_chat_retryRetry failed AI message
chat:resume/chaton_chat_resumeResume interrupted streaming
task:join/chaton_task_joinJoin task room, get sync data
task:leave/chaton_task_leaveLeave task room
task:close-session/chaton_task_close_sessionClose device session
history:sync/chaton_history_syncRequest message history
skill:response/chaton_skill_responseFrontend skill response
device:register/local-executoron_device_registerRegister local device
device:heartbeat/local-executoron_device_heartbeatDevice health check
device:status/local-executoron_device_statusUpdate device status
task:progress/local-executoron_task_progressReport execution progress
task:complete/local-executoron_task_completeReport task completion

Server → Client Events

EventNamespaceTarget RoomPurpose
auth:error/chatIndividualToken expired/invalid
chat:start/chattask:{id}AI response started
chat:chunk/chattask:{id}Streaming content chunk
chat:done/chattask:{id}AI response complete
chat:error/chattask:{id}Error during streaming
chat:cancelled/chattask:{id}Streaming cancelled
chat:message/chattask:{id} (skip sender)Other user's message (group chat)
task:created/chatuser:{id}New task created
task:status/chatuser:{id}Task status update
task:app_update/chattask:{id}App preview data updated
device:online/chatuser:{id}Device came online
device:offline/chatuser:{id}Device went offline
device:status/chatuser:{id}Device status changed
device:slot_update/chatuser:{id}Device slot usage updated
skill:request/chattask:{id}Server requesting skill action
correction:start/chattask:{id}Fact-checking started
correction:progress/chattask:{id}Fact-checking progress
correction:chunk/chattask:{id}Correction streaming
correction:done/chattask:{id}Correction complete
background:execution_update/chatuser:{id}Background task update

1.4 Redis Adapter for Scaling

# backend/app/core/socketio.py
def create_socketio_server() -> socketio.AsyncServer:
    redis_url = settings.REDIS_URL
    mgr = socketio.AsyncRedisManager(redis_url)  # Cross-worker pub/sub
    
    sio = socketio.AsyncServer(
        async_mode="asgi",
        client_manager=mgr,  # Enables horizontal scaling
        # ... other config
    )
    return sio

Redis Manager Benefits:

  • Multiple backend workers share WebSocket state via Redis
  • Events emitted on one worker reach clients connected to other workers
  • Room membership synchronized across instances
  • Automatic failover to in-memory if Redis unavailable

2. Task Execution Data Flow

2.1 Complete Execution Sequence

2.2 Execution Path Decision Logic

2.3 Caching Layers

LayerTechnologyPurposeTTL
WebSocket StreamingRedisActive streaming content cacheSession-based
Device StatusRedisOnline/offline, heartbeat, slots60s (heartbeat)
Task StatusMySQL + JSONPersistent task/subtask statePersistent
Message HistoryMySQLSubtask messages for syncPersistent
Session ManagerIn-MemoryActive stream trackingProcess lifetime

3. Real-Time Chat Streaming Mechanism

3.1 Streaming Event Sequence

3.2 Message State Machine (Frontend)

3.3 Message Ordering System

Messages are sorted by message_id (primary) and timestamp (secondary):

// frontend/src/features/tasks/hooks/useUnifiedMessages.ts
const sortedMessages = messages.sort((a, b) => {
  if (a.messageId !== undefined && b.messageId !== undefined) {
    if (a.messageId !== b.messageId) {
      return a.messageId - b.messageId
    }
    return a.timestamp - b.timestamp
  }
  if (a.messageId !== undefined) return -1
  if (b.messageId !== undefined) return 1
  return a.timestamp - b.timestamp
})

3.4 Recovery Mechanisms

ScenarioRecovery ActionImplementation
Page RefreshRejoin task room + fetch historytask:join with after_message_id
WebSocket ReconnectRejoin rooms + incremental syncjoinedTasksRef + automatic rejoin
Browser Tab SwitchResume polling + WebSocket syncvisibilitychange listener
Streaming InterruptionResume from offsetRedis cached content

4. Inter-Service Communication

4.1 Frontend ↔ Backend

DirectionProtocolEndpoint/MethodPurpose
F → BWebSocketchat:sendSend message
F → BWebSockettask:join/leaveRoom management
F → BWebSocketchat:cancel/retryControl operations
F → BHTTP REST/api/v1/tasks/*CRUD operations
F → BHTTP REST/api/v1/kinds/*Resource management
B → FWebSocketchat:* eventsStreaming
B → FWebSockettask:* eventsStatus updates

WebSocket Connection Management:

// frontend/src/contexts/SocketContext.tsx
const newSocket = io(socketUrl + '/chat', {
  path: SOCKETIO_PATH,
  auth: { token },
  transports: ['websocket', 'polling'], // Fallback
  reconnection: true,
  reconnectionAttempts: Infinity,
})

// Auto-rejoin on reconnect
newSocket.io.on('reconnect', () => {
  const tasksToRejoin = Array.from(joinedTasksRef.current)
  tasksToRejoin.forEach(taskId => {
    newSocket.emit('task:join', { task_id: taskId })
  })
})

4.2 Backend ↔ Executor Manager

DirectionProtocolEndpointPurpose
EM → BHTTP POST/api/v1/tasks/fetchPoll pending tasks
EM → BHTTP PUT/api/v1/tasks/{id}/statusUpdate status
EM → BHTTP GET/api/v1/tasks/{id}/subtasksGet subtasks

Polling-Based Architecture:

# executor_manager/scheduler/scheduler.py
def fetch_online_and_process_tasks(self):
    # Check executor capacity
    executor_count = get_executor_count("task-type=online")
    available_slots = max(0, self.max_concurrent - running)
    
    if available_slots > 0:
        # Fetch tasks from Backend
        success, result = self.api_client.fetch_tasks()
        if success:
            self.task_processor.process_tasks(tasks)

4.3 Executor Manager ↔ Executor

Executor TypeInterfaceCommunication
DockerDocker SDK + Kubernetes APIContainer lifecycle management
LocalDirect function callIn-process execution
DeviceWebSocket via /local-executorReal-time bidirectional

Docker Execution Flow:

4.4 Backend ↔ Chat Shell

DirectionProtocolEndpointPurpose
B → CSHTTP POST/v1/responseSend ChatEvent
CS → BHTTP POSTInternal APIStream chunks

Chat Shell Integration:

# Backend triggers Chat Shell
cs_payload = {
    "messages": messages,
    "model_config": model_config,
    "tools": prepared_tools,
    "stream": True,
}

async with httpx.AsyncClient() as client:
    async with client.stream("POST", CHAT_SHELL_URL, json=cs_payload) as response:
        async for chunk in response.aiter_text():
            # Parse SSE chunk
            # Emit chat:chunk via WebSocket
            await ws_emitter.emit_chat_chunk(task_id, subtask_id, content, offset)

5. Complex Synchronization Logic

5.1 Group Chat Synchronization

5.2 Streaming Recovery on Reconnect

5.3 Multi-Device Synchronization

When a user has multiple browser tabs/devices open:

  1. User Room Membership: All sessions join user:{user_id} room
  2. Task List Updates: task:created, task:status broadcast to user room
  3. Task Room Updates: Only active task viewers receive chat:* events
  4. Deduplication: Sender is excluded from chat:message broadcasts

6. Event Payload Schemas

6.1 chat:send Payload

interface ChatSendPayload {
  task_id?: number           // For multi-turn (optional for new tasks)
  team_id: number            // Target team/agent
  message: string            // User message content
  title?: string             // Custom task title
  attachment_ids?: number[]  // Context attachments
  contexts?: ContextItem[]   // Knowledge bases, etc.
  enable_deep_thinking: boolean  // Enable tools
  enable_web_search: boolean
  force_override_bot_model?: string  // Model override
  is_group_chat: boolean
  device_id?: string         // For local execution
  // ... repository info for code tasks
}

6.2 chat:chunk Payload

interface ChatChunkPayload {
  subtask_id: number
  content: string            // Delta content
  offset: number             // Position in full response
  task_id?: number           // For recovery
  result?: {
    value: string            // Full content
    thinking?: unknown[]     // Thought process
    workbench?: Record<string, unknown>
    blocks?: MessageBlock[]  // Structured content
  }
}

6.3 task:join Response

interface TaskJoinAck {
  streaming?: {
    subtask_id: number
    offset: number
    cached_content: string   // For recovery
  }
  subtasks?: Array<{
    id: number
    message_id: number
    role: string
    prompt: string
    result: unknown
    status: string
    contexts: unknown[]
    sender?: {
      user_id: number
      user_name: string
      avatar?: string
    }
  }>
  error?: string
}

7. Security Considerations

7.1 Authentication Flow

  1. Connection: Client provides JWT in auth payload
  2. Verification: Backend validates token and extracts user_id
  3. Session Storage: User context stored in Socket.IO session
  4. Token Expiry: Background checks emit auth:error when expired
  5. Room Access: All room joins verify can_access_task(user_id, task_id)

7.2 Authorization Checks

ActionCheckImplementation
ConnectValid JWTverify_jwt_token()
Join Task RoomTask access permissioncan_access_task() query
Send MessageTeam accessTeam membership check
Cancel/RetrySubtask ownershipSubtask.user_id match
Device OperationsDevice ownershipexecutor_name validation

8. Performance Optimizations

8.1 WebSocket Optimizations

  • Incremental Sync: after_message_id parameter for partial history
  • Delta Streaming: Only new content sent in chat:chunk (not full text)
  • Room Scoping: Messages only sent to interested clients (task room members)
  • Compression: WebSocket frames compressed when supported

8.2 Database Optimizations

  • Quick In/Out: DB connections released before WebSocket emissions
  • Bulk Operations: Subtask fetching uses efficient queries
  • Caching: Redis for streaming content and device status
  • Lazy Loading: Task detail loaded only on explicit join

8.3 Frontend Optimizations

  • Virtual Scrolling: Message list virtualized for large conversations
  • Debounced Updates: UI updates batched for high-frequency chunks
  • Request Deduplication: joinedTasksRef prevents duplicate joins
  • State Machine: Centralized message state reduces re-renders

9. Monitoring and Observability

9.1 OpenTelemetry Integration

ComponentSpansAttributes
WebSocket Eventsws.eventevent_type, task_id, user_id
Task Processingprocess_tasktask.id, executor.type
Chat Streamingchat.streamsubtask_id, model, duration
DB Operationsdb.querytable, operation, duration

9.2 Key Metrics

  • WebSocket: Connection count, event throughput, latency
  • Tasks: Queue depth, processing time, success rate
  • Executors: Container count, resource utilization
  • Streaming: Time-to-first-chunk, total duration

10. Summary of Communication Patterns

10.1 Pattern Overview

PatternUse CaseImplementation
Request-ResponseUser actionsWebSocket emit with ACK callback
Pub-SubBroadcast updatesRoom-based emission
StreamingAI responseschat:start → chat:chunk → chat:done
PollingTask distributionExecutor Manager polling Backend API
HeartbeatDevice health30s interval with Redis TTL
PushReal-time updatesWebSocket events from Backend

10.2 Areas Requiring Deep Analysis

  1. Stream Interruption Recovery: The after_message_id and Redis caching mechanism requires careful testing for edge cases (mid-chunk disconnect, concurrent reconnects)

  2. Group Chat Consistency: Message ordering across multiple clients with varying network conditions needs validation

  3. Cross-Worker Synchronization: Redis adapter performance under high load should be monitored

  4. Device Offline Handling: The transition from WebSocket to offline state and task reassignment logic needs robust error handling

  5. Chat Shell Backpressure: High-volume streaming from Chat Shell to Backend WebSocket needs flow control mechanisms

  6. Task Status Consistency: Ensuring task:status events align with actual subtask states across multiple update paths (HTTP, WebSocket, polling)


Appendix A: File Locations

ComponentKey Files
WebSocket Namespacesbackend/app/api/ws/chat_namespace.py
backend/app/api/ws/device_namespace.py
Event Definitionsbackend/app/api/ws/events.py
Socket.IO Serverbackend/app/core/socketio.py
WebSocket Emitterbackend/app/services/chat/ws_emitter.py
Frontend Socket Contextfrontend/src/contexts/SocketContext.tsx
Message Statefrontend/src/features/tasks/hooks/useUnifiedMessages.ts
Task State Machinefrontend/src/features/tasks/hooks/useTaskStateMachine.ts
Executor Schedulerexecutor_manager/scheduler/scheduler.py
Task Processorexecutor_manager/tasks/task_processor.py
API Clientexecutor_manager/clients/task_api_client.py
Executor Dispatcherexecutor_manager/executors/dispatcher.py

End of Document