Wegent Dataflow and Communication Architecture
Project: Wegent - AI-native operating system for intelligent agent teams
Analysis Date: 2026-02-01
Document Version: 1.0
Executive Summary
Wegent implements a sophisticated real-time communication architecture built on Socket.IO with Redis adapter for horizontal scaling. The system supports multiple execution paths (Chat Shell for direct LLM, Executor Manager for Docker-based execution, and Local Device execution) with unified WebSocket event streaming. Key architectural decisions include:
- Namespace-based isolation:
/chatfor user interactions,/local-executorfor device connections - Room-based routing: User rooms (
user:{id}) and Task rooms (task:{id}) for targeted message delivery - Dual-path execution: Direct chat streaming (Chat Shell) vs. Docker container execution (Executor)
- Redis-backed scaling: Cross-worker communication via Redis Pub/Sub
1. WebSocket/Socket.IO Architecture
1.1 Namespace Organization
| Namespace | Purpose | Connection Type | Authentication |
|---|---|---|---|
/chat | User chat interface, task management | Browser clients | JWT token in auth payload |
/local-executor | Local device/agent connections | Desktop/CLI clients | JWT token in auth payload |
1.2 Room-Based Routing System
Rooms provide targeted message delivery without broadcasting to all connected clients:
| Room Pattern | Purpose | Members |
|---|---|---|
user:{user_id} | User-specific notifications | All sessions of a user |
task:{task_id} | Task chat and updates | All users viewing the task |
device:{user_id}:{device_id} | Device-specific commands | Specific device connection |
Room Join Flow:
1.3 Event Types
Client → Server Events
| Event | Namespace | Handler | Purpose |
|---|---|---|---|
chat:send | /chat | on_chat_send | Send user message, trigger AI response |
chat:cancel | /chat | on_chat_cancel | Cancel ongoing streaming |
chat:retry | /chat | on_chat_retry | Retry failed AI message |
chat:resume | /chat | on_chat_resume | Resume interrupted streaming |
task:join | /chat | on_task_join | Join task room, get sync data |
task:leave | /chat | on_task_leave | Leave task room |
task:close-session | /chat | on_task_close_session | Close device session |
history:sync | /chat | on_history_sync | Request message history |
skill:response | /chat | on_skill_response | Frontend skill response |
device:register | /local-executor | on_device_register | Register local device |
device:heartbeat | /local-executor | on_device_heartbeat | Device health check |
device:status | /local-executor | on_device_status | Update device status |
task:progress | /local-executor | on_task_progress | Report execution progress |
task:complete | /local-executor | on_task_complete | Report task completion |
Server → Client Events
| Event | Namespace | Target Room | Purpose |
|---|---|---|---|
auth:error | /chat | Individual | Token expired/invalid |
chat:start | /chat | task:{id} | AI response started |
chat:chunk | /chat | task:{id} | Streaming content chunk |
chat:done | /chat | task:{id} | AI response complete |
chat:error | /chat | task:{id} | Error during streaming |
chat:cancelled | /chat | task:{id} | Streaming cancelled |
chat:message | /chat | task:{id} (skip sender) | Other user's message (group chat) |
task:created | /chat | user:{id} | New task created |
task:status | /chat | user:{id} | Task status update |
task:app_update | /chat | task:{id} | App preview data updated |
device:online | /chat | user:{id} | Device came online |
device:offline | /chat | user:{id} | Device went offline |
device:status | /chat | user:{id} | Device status changed |
device:slot_update | /chat | user:{id} | Device slot usage updated |
skill:request | /chat | task:{id} | Server requesting skill action |
correction:start | /chat | task:{id} | Fact-checking started |
correction:progress | /chat | task:{id} | Fact-checking progress |
correction:chunk | /chat | task:{id} | Correction streaming |
correction:done | /chat | task:{id} | Correction complete |
background:execution_update | /chat | user:{id} | Background task update |
1.4 Redis Adapter for Scaling
# backend/app/core/socketio.py
def create_socketio_server() -> socketio.AsyncServer:
redis_url = settings.REDIS_URL
mgr = socketio.AsyncRedisManager(redis_url) # Cross-worker pub/sub
sio = socketio.AsyncServer(
async_mode="asgi",
client_manager=mgr, # Enables horizontal scaling
# ... other config
)
return sio
Redis Manager Benefits:
- Multiple backend workers share WebSocket state via Redis
- Events emitted on one worker reach clients connected to other workers
- Room membership synchronized across instances
- Automatic failover to in-memory if Redis unavailable
2. Task Execution Data Flow
2.1 Complete Execution Sequence
2.2 Execution Path Decision Logic
2.3 Caching Layers
| Layer | Technology | Purpose | TTL |
|---|---|---|---|
| WebSocket Streaming | Redis | Active streaming content cache | Session-based |
| Device Status | Redis | Online/offline, heartbeat, slots | 60s (heartbeat) |
| Task Status | MySQL + JSON | Persistent task/subtask state | Persistent |
| Message History | MySQL | Subtask messages for sync | Persistent |
| Session Manager | In-Memory | Active stream tracking | Process lifetime |
3. Real-Time Chat Streaming Mechanism
3.1 Streaming Event Sequence
3.2 Message State Machine (Frontend)
3.3 Message Ordering System
Messages are sorted by message_id (primary) and timestamp (secondary):
// frontend/src/features/tasks/hooks/useUnifiedMessages.ts
const sortedMessages = messages.sort((a, b) => {
if (a.messageId !== undefined && b.messageId !== undefined) {
if (a.messageId !== b.messageId) {
return a.messageId - b.messageId
}
return a.timestamp - b.timestamp
}
if (a.messageId !== undefined) return -1
if (b.messageId !== undefined) return 1
return a.timestamp - b.timestamp
})
3.4 Recovery Mechanisms
| Scenario | Recovery Action | Implementation |
|---|---|---|
| Page Refresh | Rejoin task room + fetch history | task:join with after_message_id |
| WebSocket Reconnect | Rejoin rooms + incremental sync | joinedTasksRef + automatic rejoin |
| Browser Tab Switch | Resume polling + WebSocket sync | visibilitychange listener |
| Streaming Interruption | Resume from offset | Redis cached content |
4. Inter-Service Communication
4.1 Frontend ↔ Backend
| Direction | Protocol | Endpoint/Method | Purpose |
|---|---|---|---|
| F → B | WebSocket | chat:send | Send message |
| F → B | WebSocket | task:join/leave | Room management |
| F → B | WebSocket | chat:cancel/retry | Control operations |
| F → B | HTTP REST | /api/v1/tasks/* | CRUD operations |
| F → B | HTTP REST | /api/v1/kinds/* | Resource management |
| B → F | WebSocket | chat:* events | Streaming |
| B → F | WebSocket | task:* events | Status updates |
WebSocket Connection Management:
// frontend/src/contexts/SocketContext.tsx
const newSocket = io(socketUrl + '/chat', {
path: SOCKETIO_PATH,
auth: { token },
transports: ['websocket', 'polling'], // Fallback
reconnection: true,
reconnectionAttempts: Infinity,
})
// Auto-rejoin on reconnect
newSocket.io.on('reconnect', () => {
const tasksToRejoin = Array.from(joinedTasksRef.current)
tasksToRejoin.forEach(taskId => {
newSocket.emit('task:join', { task_id: taskId })
})
})
4.2 Backend ↔ Executor Manager
| Direction | Protocol | Endpoint | Purpose |
|---|---|---|---|
| EM → B | HTTP POST | /api/v1/tasks/fetch | Poll pending tasks |
| EM → B | HTTP PUT | /api/v1/tasks/{id}/status | Update status |
| EM → B | HTTP GET | /api/v1/tasks/{id}/subtasks | Get subtasks |
Polling-Based Architecture:
# executor_manager/scheduler/scheduler.py
def fetch_online_and_process_tasks(self):
# Check executor capacity
executor_count = get_executor_count("task-type=online")
available_slots = max(0, self.max_concurrent - running)
if available_slots > 0:
# Fetch tasks from Backend
success, result = self.api_client.fetch_tasks()
if success:
self.task_processor.process_tasks(tasks)
4.3 Executor Manager ↔ Executor
| Executor Type | Interface | Communication |
|---|---|---|
| Docker | Docker SDK + Kubernetes API | Container lifecycle management |
| Local | Direct function call | In-process execution |
| Device | WebSocket via /local-executor | Real-time bidirectional |
Docker Execution Flow:
4.4 Backend ↔ Chat Shell
| Direction | Protocol | Endpoint | Purpose |
|---|---|---|---|
| B → CS | HTTP POST | /v1/response | Send ChatEvent |
| CS → B | HTTP POST | Internal API | Stream chunks |
Chat Shell Integration:
# Backend triggers Chat Shell
cs_payload = {
"messages": messages,
"model_config": model_config,
"tools": prepared_tools,
"stream": True,
}
async with httpx.AsyncClient() as client:
async with client.stream("POST", CHAT_SHELL_URL, json=cs_payload) as response:
async for chunk in response.aiter_text():
# Parse SSE chunk
# Emit chat:chunk via WebSocket
await ws_emitter.emit_chat_chunk(task_id, subtask_id, content, offset)
5. Complex Synchronization Logic
5.1 Group Chat Synchronization
5.2 Streaming Recovery on Reconnect
5.3 Multi-Device Synchronization
When a user has multiple browser tabs/devices open:
- User Room Membership: All sessions join
user:{user_id}room - Task List Updates:
task:created,task:statusbroadcast to user room - Task Room Updates: Only active task viewers receive
chat:*events - Deduplication: Sender is excluded from
chat:messagebroadcasts
6. Event Payload Schemas
6.1 chat:send Payload
interface ChatSendPayload {
task_id?: number // For multi-turn (optional for new tasks)
team_id: number // Target team/agent
message: string // User message content
title?: string // Custom task title
attachment_ids?: number[] // Context attachments
contexts?: ContextItem[] // Knowledge bases, etc.
enable_deep_thinking: boolean // Enable tools
enable_web_search: boolean
force_override_bot_model?: string // Model override
is_group_chat: boolean
device_id?: string // For local execution
// ... repository info for code tasks
}
6.2 chat:chunk Payload
interface ChatChunkPayload {
subtask_id: number
content: string // Delta content
offset: number // Position in full response
task_id?: number // For recovery
result?: {
value: string // Full content
thinking?: unknown[] // Thought process
workbench?: Record<string, unknown>
blocks?: MessageBlock[] // Structured content
}
}
6.3 task:join Response
interface TaskJoinAck {
streaming?: {
subtask_id: number
offset: number
cached_content: string // For recovery
}
subtasks?: Array<{
id: number
message_id: number
role: string
prompt: string
result: unknown
status: string
contexts: unknown[]
sender?: {
user_id: number
user_name: string
avatar?: string
}
}>
error?: string
}
7. Security Considerations
7.1 Authentication Flow
- Connection: Client provides JWT in
authpayload - Verification: Backend validates token and extracts user_id
- Session Storage: User context stored in Socket.IO session
- Token Expiry: Background checks emit
auth:errorwhen expired - Room Access: All room joins verify
can_access_task(user_id, task_id)
7.2 Authorization Checks
| Action | Check | Implementation |
|---|---|---|
| Connect | Valid JWT | verify_jwt_token() |
| Join Task Room | Task access permission | can_access_task() query |
| Send Message | Team access | Team membership check |
| Cancel/Retry | Subtask ownership | Subtask.user_id match |
| Device Operations | Device ownership | executor_name validation |
8. Performance Optimizations
8.1 WebSocket Optimizations
- Incremental Sync:
after_message_idparameter for partial history - Delta Streaming: Only new content sent in
chat:chunk(not full text) - Room Scoping: Messages only sent to interested clients (task room members)
- Compression: WebSocket frames compressed when supported
8.2 Database Optimizations
- Quick In/Out: DB connections released before WebSocket emissions
- Bulk Operations: Subtask fetching uses efficient queries
- Caching: Redis for streaming content and device status
- Lazy Loading: Task detail loaded only on explicit join
8.3 Frontend Optimizations
- Virtual Scrolling: Message list virtualized for large conversations
- Debounced Updates: UI updates batched for high-frequency chunks
- Request Deduplication:
joinedTasksRefprevents duplicate joins - State Machine: Centralized message state reduces re-renders
9. Monitoring and Observability
9.1 OpenTelemetry Integration
| Component | Spans | Attributes |
|---|---|---|
| WebSocket Events | ws.event | event_type, task_id, user_id |
| Task Processing | process_task | task.id, executor.type |
| Chat Streaming | chat.stream | subtask_id, model, duration |
| DB Operations | db.query | table, operation, duration |
9.2 Key Metrics
- WebSocket: Connection count, event throughput, latency
- Tasks: Queue depth, processing time, success rate
- Executors: Container count, resource utilization
- Streaming: Time-to-first-chunk, total duration
10. Summary of Communication Patterns
10.1 Pattern Overview
| Pattern | Use Case | Implementation |
|---|---|---|
| Request-Response | User actions | WebSocket emit with ACK callback |
| Pub-Sub | Broadcast updates | Room-based emission |
| Streaming | AI responses | chat:start → chat:chunk → chat:done |
| Polling | Task distribution | Executor Manager polling Backend API |
| Heartbeat | Device health | 30s interval with Redis TTL |
| Push | Real-time updates | WebSocket events from Backend |
10.2 Areas Requiring Deep Analysis
Stream Interruption Recovery: The
after_message_idand Redis caching mechanism requires careful testing for edge cases (mid-chunk disconnect, concurrent reconnects)Group Chat Consistency: Message ordering across multiple clients with varying network conditions needs validation
Cross-Worker Synchronization: Redis adapter performance under high load should be monitored
Device Offline Handling: The transition from WebSocket to offline state and task reassignment logic needs robust error handling
Chat Shell Backpressure: High-volume streaming from Chat Shell to Backend WebSocket needs flow control mechanisms
Task Status Consistency: Ensuring
task:statusevents align with actual subtask states across multiple update paths (HTTP, WebSocket, polling)
Appendix A: File Locations
| Component | Key Files |
|---|---|
| WebSocket Namespaces | backend/app/api/ws/chat_namespace.pybackend/app/api/ws/device_namespace.py |
| Event Definitions | backend/app/api/ws/events.py |
| Socket.IO Server | backend/app/core/socketio.py |
| WebSocket Emitter | backend/app/services/chat/ws_emitter.py |
| Frontend Socket Context | frontend/src/contexts/SocketContext.tsx |
| Message State | frontend/src/features/tasks/hooks/useUnifiedMessages.ts |
| Task State Machine | frontend/src/features/tasks/hooks/useTaskStateMachine.ts |
| Executor Scheduler | executor_manager/scheduler/scheduler.py |
| Task Processor | executor_manager/tasks/task_processor.py |
| API Client | executor_manager/clients/task_api_client.py |
| Executor Dispatcher | executor_manager/executors/dispatcher.py |
End of Document