TaskStateMachine Deep Dive Analysis
Executive Summary
The TaskStateMachine is a sophisticated state management architecture designed to ensure message consistency in real-time collaborative chat scenarios. It consolidates scattered recovery logic into a unified, event-driven state machine that handles WebSocket reconnection, message synchronization, and streaming continuity.
Key Achievement: Eliminates race conditions, prevents message loss during network interruptions, and provides a single source of truth for all chat message state.
1. TaskStateMachine Architecture
1.1 Singleton Manager Pattern
┌─────────────────────────────────────────────────────────────┐
│ TaskStateManager │
│ (Singleton) │
├─────────────────────────────────────────────────────────────┤
│ - machines: Map<number, TaskStateMachine> │
│ - deps: TaskStateMachineDeps (SocketContext) │
│ - globalListeners: Set<StateListener> │
├─────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Task #1001 │ │ Task #1002 │ │ Task #1003 │ │
│ │ StateMachine │ │ StateMachine │ │ StateMachine │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
Design Rationale:
- Each task has its own isolated state machine
- Global manager handles coordination (recovery across all tasks)
- Dependency injection for WebSocket operations
1.2 State Structure (TaskStateData)
interface TaskStateData {
taskId: number // Task identifier
status: TaskStatus // Current state machine status
messages: Map<string, UnifiedMessage> // All messages
streamingSubtaskId: number | null // Active streaming subtask
streamingInfo: StreamingInfo | null // Redis cached content
error: string | null // Error state
isStopping: boolean // Stop requested
}
type TaskStatus =
| 'idle' // Not joined WebSocket room
| 'joining' // Joining WebSocket room
| 'syncing' // Syncing messages from backend
| 'ready' // Ready, no streaming
| 'streaming' // Has active streaming message
| 'error' // Error occurred
1.3 Observer Pattern Implementation
// In TaskStateMachine
private listeners: Set<StateListener> = new Set()
subscribe(listener: StateListener): () => void {
this.listeners.add(listener)
return () => { this.listeners.delete(listener) }
}
private notifyListeners(): void {
const stateCopy = this.getState()
this.listeners.forEach(listener => {
try {
listener(stateCopy)
} catch (err) {
console.error('[TaskStateMachine] Error in listener:', err)
}
})
}
Memory Safety:
- Each listener gets a fresh state copy (immutable)
- Unsubscribe function returned for cleanup
- Error isolation (one failing listener doesn't break others)
2. State Diagram (Mermaid)
3. Message Recovery Mechanism
3.1 Recovery Flow Architecture
┌────────────────┐ ┌──────────────────┐ ┌────────────────┐
│ Trigger │────▶│ RECOVER │────▶│ joining │
│ (Reconnect, │ │ (Debounced) │ │ WebSocket │
│ Visibility, │ │ │ │ Room Join │
│ Manual) │ │ │ │ │
└────────────────┘ └──────────────────┘ └────────────────┘
│
┌───────────────────────────────┘
▼
┌────────────────┐
│ JOIN_SUCCESS │
│ + subtasks[] │
└────────────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ SYNC_DONE │ │SYNC_DONE │ │SYNC_ERROR │
│ (ready) │ │_STREAMING │ │ │
└────────────┘ │(streaming) │ └────────────┘
└────────────┘
3.2 Critical Recovery Logic (handleRecover)
private async handleRecover(event: Extract<Event, { type: 'RECOVER' }>): Promise<void> {
// 1. Queue if already joining/syncing
if (this.state.status === 'joining' || this.state.status === 'syncing') {
this.pendingRecovery = true
return
}
// 2. Debounce check (1s default)
const now = Date.now()
if (!event.force && now - this.lastRecoveryTime < this.recoveryDebounceMs) {
return
}
this.lastRecoveryTime = now
// 3. Check WebSocket connection
if (!this.deps.isConnected()) return
// 4. Calculate max messageId for incremental sync
let maxMessageId: number | undefined
for (const msg of this.state.messages.values()) {
if (msg.messageId !== undefined &&
(maxMessageId === undefined || msg.messageId > maxMessageId)) {
maxMessageId = msg.messageId
}
}
// 5. Join task room with incremental sync hint
const response = await this.deps.joinTask(this.state.taskId, {
forceRefresh: true,
afterMessageId: maxMessageId, // Only get messages after this ID
})
// 6. Dispatch JOIN_SUCCESS with subtasks for sync
await this.dispatch({
type: 'JOIN_SUCCESS',
streamingInfo: response.streaming,
subtasks: response.subtasks as TaskDetailSubtask[] | undefined,
})
}
3.3 Content Priority System
When syncing RUNNING AI messages, the system prioritizes content from most-recent to least-recent sources:
Priority Order (for RUNNING messages):
1. Redis cached_content (updated every 1s via streamingInfo)
2. Existing message content in state (from previous chunks)
3. Backend DB subtask.result.value (updated every 5s)
Selection Strategy: Choose the LONGEST content
// Content priority in buildMessages()
if (!isUserMessage && subtask.status === 'RUNNING') {
let bestContent = backendContent
// Check existing message content
if (existingAiMessage && existingAiMessage.content.length > bestContent.length) {
bestContent = existingAiMessage.content
}
// Check Redis cached content (highest priority)
if (streamingInfo &&
streamingInfo.subtask_id === subtask.id &&
streamingInfo.cached_content &&
streamingInfo.cached_content.length > bestContent.length) {
bestContent = streamingInfo.cached_content
}
messages.set(messageId, { ...bestContent })
}
4. WebSocket Event Handling
4.1 Event Flow Diagram
┌──────────────────────────────────────────────────────────────────┐
│ WebSocket Events │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ chat:start │───▶│ Add AI │───▶│ status: │ │
│ │ (subtask_id)│ │ message │ │ streaming │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ chat:chunk │───▶│ Append to │ │
│ │ (content) │ │ message. │ │
│ └─────────────┘ │ content │ │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ chat:done │───▶│ Mark as │───▶│ status: │ │
│ │ (result) │ │ completed │ │ ready │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ chat:error │───▶│ Mark as │───▶│ status: │ │
│ │ (error) │ │ error │ │ error │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │chat:message │───▶│ Add user │ (group chat - other users) │
│ │ (broadcast) │ │ message │ │
│ └─────────────┘ └─────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
4.2 Critical Handler: CHAT_CHUNK
private handleChatChunkEvent(event: Extract<Event, { type: 'CHAT_CHUNK' }>): void {
const aiMessageId = generateMessageId('ai', event.subtaskId)
// CRITICAL: Queue chunks during joining/syncing
if (this.state.status === 'joining' || this.state.status === 'syncing') {
this.pendingChunks.push({
subtaskId: event.subtaskId,
content: event.content,
result: event.result,
sources: event.sources,
blockId: event.blockId,
})
return
}
const existingMessage = this.getAiMessage(event.subtaskId)
if (!existingMessage) {
console.warn('[TaskStateMachine] CHAT_CHUNK ignored - message not found')
return
}
// CRITICAL FIX: Always append content to message.content
// Previously, when block_id existed, content was only added to blocks
// causing UI to lose cached_content after page refresh
const updatedMessage: UnifiedMessage = {
...existingMessage,
content: existingMessage.content + event.content,
}
// Handle reasoning content (DeepSeek R1)
if (event.result?.reasoning_chunk) {
updatedMessage.reasoningContent =
(existingMessage.reasoningContent || '') + event.result.reasoning_chunk
}
// Handle blocks (text blocks, tool calls)
const mergedBlocks = this.mergeBlocks(existingMessage, event)
if (event.result) {
updatedMessage.result = {
...existingMessage.result,
...event.result,
blocks: mergedBlocks,
}
}
// Update state and notify
const newMessages = new Map(this.state.messages)
newMessages.set(aiMessageId, updatedMessage)
this.state = { ...this.state, messages: newMessages }
this.notifyListeners()
}
4.3 Pending Chunks Queue
Race condition handling: If a chat:chunk arrives while the machine is still joining or syncing, the chunk is queued and applied after sync completes.
private pendingChunks: PendingChunkEvent[] = []
// Queue chunks during sync
if (this.state.status === 'joining' || this.state.status === 'syncing') {
this.pendingChunks.push({ subtaskId, content, result, sources, blockId })
return
}
// Apply after sync completes
private applyPendingChunks(): void {
for (const chunk of this.pendingChunks) {
const aiMessageId = generateMessageId('ai', chunk.subtaskId)
const existingMessage = this.state.messages.get(aiMessageId)
if (!existingMessage) {
console.warn('[TaskStateMachine] Pending chunk skipped - message not found')
continue
}
// Apply chunk to message (same logic as handleChatChunkEvent)
const updatedMessage = {
...existingMessage,
content: existingMessage.content + chunk.content,
}
// ... handle blocks, reasoning, sources
}
this.pendingChunks = []
this.notifyListeners()
}
5. Temp to Real Task ID Migration
5.1 The Problem
When a user sends the first message in a new task:
- Frontend uses a temporary negative ID (e.g.,
-1738362000000) - Message sent to backend via WebSocket
- Backend creates real task with positive ID (e.g.,
1001) - WebSocket events (
chat:start,chat:chunk) reference the real ID - Components using the temp ID need to continue working
5.2 Migration Architecture
Step 1: User sends message to new task
┌────────────────────────────────────────────────────────────┐
│ Client uses tempTaskId = -1738362000000 │
│ ├── Create TaskStateMachine for tempTaskId │
│ ├── Add user message (status: pending) │
│ └── Send chat:message via WebSocket │
└────────────────────────────────────────────────────────────┘
│
▼
Step 2: Backend responds
┌────────────────────────────────────────────────────────────┐
│ Backend returns realTaskId = 1001 │
│ ├── Callbacks moved from temp → real │
│ └── tempToRealTaskIdRef.set(-1738362000000, 1001) │
└────────────────────────────────────────────────────────────┘
│
▼
Step 3: State Migration
┌────────────────────────────────────────────────────────────┐
│ taskStateManager.migrateState(-1738362000000, 1001) │
│ ├── machines.set(1001, tempMachine) // Both point to same│
│ ├── DO NOT delete temp mapping! │
│ └── Now both IDs reference same state machine │
└────────────────────────────────────────────────────────────┘
5.3 Migration Implementation
migrateState(tempTaskId: number, realTaskId: number): void {
const tempMachine = this.machines.get(tempTaskId)
if (!tempMachine) {
console.log('[TaskStateManager] No temp machine to migrate from', tempTaskId)
return
}
// Check if real machine already exists (edge case)
const existingRealMachine = this.machines.get(realTaskId)
if (existingRealMachine && existingRealMachine !== tempMachine) {
console.warn('[TaskStateManager] Real machine already exists')
tempMachine.leave()
this.machines.delete(tempTaskId)
return
}
// KEY: Make real task ID point to SAME state machine instance
// IMPORTANT: Keep BOTH mappings so components using either ID work
this.machines.set(realTaskId, tempMachine)
// DO NOT: this.machines.delete(tempTaskId) // REMOVED!
const state = tempMachine.getState()
console.log('[TaskStateManager] Migrated state: both', tempTaskId,
'and', realTaskId, 'now point to same machine')
}
5.4 Why Keep Both Mappings?
- Components:
useTaskStateMachine(tempTaskId)continues to work - WebSocket Events:
chat:chunkwithtask_id: 1001finds the machine - Streaming Continuity: Active stream not interrupted during migration
- State Preservation: All pending messages, streaming status preserved
6. Complex Scenarios
6.1 Page Refresh During Streaming
Scenario: User is streaming, refreshes page
T+0: User has streaming message (content: "Hello worl...")
T+1: User refreshes page
└── State machine destroyed, messages lost from memory
T+2: Page reloads, component mounts
└── useUnifiedMessages calls recover()
T+3: Recovery begins
├── joinTask(task_id) emitted
├── Backend responds with:
│ ├── streamingInfo: { subtask_id: 50, cached_content: "Hello world!" }
│ └── subtasks: [...]
└── JOIN_SUCCESS dispatched
T+4: doSync(subtasks) called
├── Build messages from backend subtasks
├── Check streamingInfo exists
├── Create AI message with cached_content
└── Dispatch SYNC_DONE_STREAMING
T+5: Streaming continues from Redis cache
├── New chat:chunk events append to cached content
└── User sees complete message history
Key Code in doSync():
// CRITICAL: If streamingInfo exists but no streaming message was created,
// create one now. This handles the race condition where:
// 1. User joins room while streaming is in progress
// 2. Backend returns streamingInfo with subtask_id and cached_content
// 3. But subtasks array doesn't contain this subtask yet
const streamingInfo = this.state.streamingInfo
if (streamingInfo && streamingInfo.subtask_id) {
const aiMessageId = generateMessageId('ai', streamingInfo.subtask_id)
const existingMessage = this.state.messages.get(aiMessageId)
if (!existingMessage) {
const newMessages = new Map(this.state.messages)
newMessages.set(aiMessageId, {
id: aiMessageId,
type: 'ai',
status: 'streaming',
content: streamingInfo.cached_content || '', // Use Redis cache!
timestamp: Date.now(),
subtaskId: streamingInfo.subtask_id,
})
this.state = { ...this.state, messages: newMessages }
}
}
6.2 Multiple Tabs with Same Task
Each tab maintains its own TaskStateMachine instance (isolated by browser context):
Tab 1 (Active) Tab 2 (Background)
│ │
├── WebSocket connected ───▶│
│ │
├── chat:send ─────────────▶│ (backend receives once)
│ │
▼ ▼
TaskStateMachine #1 TaskStateMachine #2
│ │
├── chat:start ◀────────────┤ (broadcast to both)
│ │
├── chat:chunk ◀────────────┤ (broadcast)
│ │
└── chat:done ◀─────────────┤ (broadcast)
Behavior:
- Each tab joins the task room independently
- Backend broadcasts events to all joined clients
- Each tab maintains its own message state
- If Tab 1 refreshes, it recovers independently
6.3 Network Reconnection Handling
┌─────────────────────────────────────────────────────────────────┐
│ Network Reconnection Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ T+0: User is streaming │
│ ├── status: streaming │
│ └── messages: [...] │
│ │
│ T+1: Network disconnects │
│ ├── WebSocket closed │
│ └── Socket.io auto-reconnects │
│ │
│ T+2: WebSocket reconnected │
│ ├── reconnect callback triggered │
│ └── taskStateManager.recoverAll() called │
│ └── For each active task machine.recover({force:true})│
│ │
│ T+3: Recovery per task │
│ ├── Calculate max messageId from existing messages │
│ ├── joinTask(taskId, { afterMessageId: maxId }) │
│ ├── Backend returns: │
│ │ ├── streamingInfo (if still streaming) │
│ │ └── subtasks (only messages after maxId) │
│ └── Sync applies incremental updates │
│ │
│ T+4: Resume streaming │
│ ├── If streamingInfo exists → create message with cache │
│ ├── Apply any pending chunks from queue │
│ └── Continue receiving new chat:chunk events │
│ │
└─────────────────────────────────────────────────────────────────┘
6.4 Message Ordering Guarantees
Ordering Strategy in useUnifiedMessages:
// Sort messages by messageId (primary) and timestamp (secondary)
const sortedMessages = messages.sort((a, b) => {
// If both have messageId, use it as primary sort key
if (a.messageId !== undefined && b.messageId !== undefined) {
if (a.messageId !== b.messageId) {
return a.messageId - b.messageId
}
// Same messageId, use timestamp as secondary sort key
return a.timestamp - b.timestamp
}
// If only one has messageId, the one with messageId comes first
if (a.messageId !== undefined) return -1
if (b.messageId !== undefined) return 1
// Neither has messageId (both pending), sort by timestamp
return a.timestamp - b.timestamp
})
Guarantees:
- Backend-assigned
messageIdis primary sort key (strict ordering) timestampis secondary sort key (fallback for pending messages)- Messages with
messageIdalways appear before pending messages - Pending messages (user's local input) sorted by timestamp
7. Single Source of Truth Principle
7.1 Data Flow Architecture
┌─────────────────────────────────────────────────────────────────┐
│ SINGLE SOURCE OF TRUTH │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────┐ │
│ │ selectedTaskDetail │ │
│ │ (Backend API Data) │ │
│ │ - subtasks[] │ │
│ │ - Stale after fetch │ │
│ └──────────────────────┘ │
│ │ │
│ │ ONLY used for initial sync │
│ ▼ │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ TaskStateMachine │◀────▶│ WebSocket Events │ │
│ │ - messages Map │ │ - chat:start │ │
│ │ - streaming status │ │ - chat:chunk │ │
│ │ - REAL-TIME state │ │ - chat:done │ │
│ └──────────────────────┘ │ - chat:message │ │
│ │ └──────────────────────┘ │
│ │ │
│ │ ONLY source for display │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ useUnifiedMessages │ │
│ │ - Returns messages │ │
│ │ - Sorted & formatted│ │
│ └──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ Chat UI (render) │ │
│ └──────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
7.2 Critical Rules
Rule 1: NEVER use selectedTaskDetail.subtasks for display
// ❌ WRONG
const messages = selectedTaskDetail.subtasks // Stale data!
// ✅ CORRECT
const { messages } = useUnifiedMessages({ team, isGroupChat })
Rule 2: TaskStateMachine is updated by WebSocket events ONLY
// WebSocket events update state machine:
handleChatStart(data) {
const machine = taskStateManager.getOrCreate(data.task_id)
machine.handleChatStart(data.subtask_id) // Updates internal state
}
handleChatChunk(data) {
const machine = taskStateManager.getOrCreate(data.task_id)
machine.handleChatChunk(data.subtask_id, data.content)
}
Rule 3: Recovery syncs backend state INTO the state machine
// Recovery fetches subtasks and builds messages
private async doSync(subtasks?: TaskDetailSubtask[]): Promise<void> {
if (subtasks && subtasks.length > 0) {
this.buildMessages(subtasks) // Sync into state machine
}
}
8. Block-Based Message Architecture
Modern AI messages support mixed content (text + tool calls) via blocks:
interface MessageBlock {
id: string // Unique block identifier
type: 'text' | 'tool' | 'thinking' | 'error'
content?: string // Text content
tool_use_id?: string // Tool call ID
tool_name?: string // Tool name
tool_input?: Record<string, unknown>
tool_output?: unknown
status?: 'pending' | 'streaming' | 'done' | 'error'
timestamp?: number
}
Block Merging Logic
private mergeBlocks(
existingMessage: UnifiedMessage,
event: Extract<Event, { type: 'CHAT_CHUNK' }>
): MessageBlock[] {
const existingBlocks = existingMessage.result?.blocks || []
// Case 1: block_id with content - text block update
if (event.blockId && event.content) {
const blocksMap = new Map(existingBlocks.map(b => [b.id, b]))
const targetBlock = blocksMap.get(event.blockId)
if (targetBlock && targetBlock.type === 'text') {
// Update existing text block
const updatedBlock = {
...targetBlock,
content: (targetBlock.content || '') + event.content,
}
blocksMap.set(event.blockId, updatedBlock)
} else if (!targetBlock) {
// Create new text block
const newBlock: MessageBlock = {
id: event.blockId,
type: 'text',
content: event.content,
status: 'streaming',
timestamp: Date.now(),
}
blocksMap.set(event.blockId, newBlock)
}
return Array.from(blocksMap.values())
}
// Case 2: Tool blocks - merge by block.id (replace strategy)
const incomingBlocks = event.result?.blocks || []
if (incomingBlocks.length > 0) {
const blocksMap = new Map(existingBlocks.map(b => [b.id, b]))
incomingBlocks.forEach(incomingBlock => {
blocksMap.set(incomingBlock.id, incomingBlock) // Replace
})
return Array.from(blocksMap.values())
}
// Case 3: No changes
return existingBlocks
}
9. Memory Management and Cleanup
9.1 Cleanup Strategies
// Per-task cleanup
leave(): void {
this.state = {
...this.state,
status: 'idle',
messages: new Map(), // Clear messages
streamingSubtaskId: null,
streamingInfo: null,
error: null,
isStopping: false,
}
this.pendingRecovery = false
this.pendingChunks = [] // Clear pending chunks
}
// Cleanup via TaskStateManager
cleanup(taskId: number): void {
const machine = this.machines.get(taskId)
if (machine) {
machine.leave() // Reset state
this.machines.delete(taskId) // Remove reference
}
}
// Cleanup all
cleanupAll(): void {
for (const [taskId, machine] of this.machines) {
machine.leave()
}
this.machines.clear()
}
9.2 When Cleanup Happens
- Task Switch: User selects different task → cleanup old task
- Logout: User logs out → cleanupAll()
- Component Unmount: Chat page unmounts → cleanup via useEffect
- Explicit Reset: User clicks "Clear chat" → resetStream(taskId)
10. Key Insights and Design Decisions
10.1 Why Event-Driven Architecture?
- Predictability: All state changes flow through a single
dispatch()method - Traceability: Easy to log and debug state transitions
- Testability: Events can be simulated for unit testing
- Recovery: Recovery is just another event (
RECOVER)
10.2 Why Keep Both Temp and Real Task IDs?
- Component Stability: Components using temp ID don't need to re-mount
- Event Routing: WebSocket events with real ID find the machine
- Streaming Continuity: No interruption during migration
10.3 Why Content Priority (Redis > Existing > Backend)?
- Redis: Updated every 1s during streaming, most recent
- Existing: Already received via WebSocket chunks
- Backend DB: Updated every 5s, least recent
10.4 Why Debounce Recovery?
- Prevents cascading recoveries during rapid reconnections
- 1s default provides balance between responsiveness and stability
force: truebypasses debounce for manual recovery
11. Summary
The TaskStateMachine architecture ensures message consistency through:
- Centralized State Management: Single TaskStateMachine per task with global manager
- Event-Driven Design: All state changes via typed events (RECOVER, CHAT_START, CHAT_CHUNK, etc.)
- Content Priority System: Redis cache > existing > backend for RUNNING messages
- Pending Chunks Queue: Handles race conditions between sync and incoming chunks
- Temp→Real ID Migration: Both IDs point to same machine, ensuring continuity
- Incremental Sync:
afterMessageIdreduces bandwidth on reconnect - Single Source of Truth:
useUnifiedMessagesreturns ONLY state machine messages
Result: Robust message handling that survives page refreshes, network interruptions, and complex multi-tab scenarios while maintaining strict ordering guarantees.