Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • TaskStateMachine Deep Dive Analysis

TaskStateMachine Deep Dive Analysis

Executive Summary

The TaskStateMachine is a sophisticated state management architecture designed to ensure message consistency in real-time collaborative chat scenarios. It consolidates scattered recovery logic into a unified, event-driven state machine that handles WebSocket reconnection, message synchronization, and streaming continuity.

Key Achievement: Eliminates race conditions, prevents message loss during network interruptions, and provides a single source of truth for all chat message state.


1. TaskStateMachine Architecture

1.1 Singleton Manager Pattern

┌─────────────────────────────────────────────────────────────┐
│                    TaskStateManager                         │
│                    (Singleton)                              │
├─────────────────────────────────────────────────────────────┤
│  - machines: Map<number, TaskStateMachine>                  │
│  - deps: TaskStateMachineDeps (SocketContext)               │
│  - globalListeners: Set<StateListener>                      │
├─────────────────────────────────────────────────────────────┤
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │ Task #1001   │  │ Task #1002   │  │ Task #1003   │      │
│  │ StateMachine │  │ StateMachine │  │ StateMachine │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
└─────────────────────────────────────────────────────────────┘

Design Rationale:

  • Each task has its own isolated state machine
  • Global manager handles coordination (recovery across all tasks)
  • Dependency injection for WebSocket operations

1.2 State Structure (TaskStateData)

interface TaskStateData {
  taskId: number                // Task identifier
  status: TaskStatus           // Current state machine status
  messages: Map<string, UnifiedMessage>  // All messages
  streamingSubtaskId: number | null      // Active streaming subtask
  streamingInfo: StreamingInfo | null    // Redis cached content
  error: string | null                   // Error state
  isStopping: boolean                    // Stop requested
}

type TaskStatus = 
  | 'idle'        // Not joined WebSocket room
  | 'joining'     // Joining WebSocket room
  | 'syncing'     // Syncing messages from backend
  | 'ready'       // Ready, no streaming
  | 'streaming'   // Has active streaming message
  | 'error'       // Error occurred

1.3 Observer Pattern Implementation

// In TaskStateMachine
private listeners: Set<StateListener> = new Set()

subscribe(listener: StateListener): () => void {
  this.listeners.add(listener)
  return () => { this.listeners.delete(listener) }
}

private notifyListeners(): void {
  const stateCopy = this.getState()
  this.listeners.forEach(listener => {
    try {
      listener(stateCopy)
    } catch (err) {
      console.error('[TaskStateMachine] Error in listener:', err)
    }
  })
}

Memory Safety:

  • Each listener gets a fresh state copy (immutable)
  • Unsubscribe function returned for cleanup
  • Error isolation (one failing listener doesn't break others)

2. State Diagram (Mermaid)


3. Message Recovery Mechanism

3.1 Recovery Flow Architecture

┌────────────────┐     ┌──────────────────┐     ┌────────────────┐
│   Trigger      │────▶│    RECOVER       │────▶│    joining     │
│  (Reconnect,   │     │    (Debounced)   │     │   WebSocket    │
│   Visibility,  │     │                  │     │   Room Join    │
│   Manual)      │     │                  │     │                │
└────────────────┘     └──────────────────┘     └────────────────┘
                                                         │
                         ┌───────────────────────────────┘
                         ▼
                  ┌────────────────┐
                  │  JOIN_SUCCESS  │
                  │  + subtasks[]  │
                  └────────────────┘
                         │
          ┌──────────────┼──────────────┐
          ▼              ▼              ▼
   ┌────────────┐ ┌────────────┐ ┌────────────┐
   │ SYNC_DONE  │ │SYNC_DONE   │ │SYNC_ERROR  │
   │ (ready)    │ │_STREAMING  │ │            │
   └────────────┘ │(streaming) │ └────────────┘
                  └────────────┘

3.2 Critical Recovery Logic (handleRecover)

private async handleRecover(event: Extract<Event, { type: 'RECOVER' }>): Promise<void> {
  // 1. Queue if already joining/syncing
  if (this.state.status === 'joining' || this.state.status === 'syncing') {
    this.pendingRecovery = true
    return
  }

  // 2. Debounce check (1s default)
  const now = Date.now()
  if (!event.force && now - this.lastRecoveryTime < this.recoveryDebounceMs) {
    return
  }
  this.lastRecoveryTime = now

  // 3. Check WebSocket connection
  if (!this.deps.isConnected()) return

  // 4. Calculate max messageId for incremental sync
  let maxMessageId: number | undefined
  for (const msg of this.state.messages.values()) {
    if (msg.messageId !== undefined && 
        (maxMessageId === undefined || msg.messageId > maxMessageId)) {
      maxMessageId = msg.messageId
    }
  }

  // 5. Join task room with incremental sync hint
  const response = await this.deps.joinTask(this.state.taskId, {
    forceRefresh: true,
    afterMessageId: maxMessageId,  // Only get messages after this ID
  })

  // 6. Dispatch JOIN_SUCCESS with subtasks for sync
  await this.dispatch({
    type: 'JOIN_SUCCESS',
    streamingInfo: response.streaming,
    subtasks: response.subtasks as TaskDetailSubtask[] | undefined,
  })
}

3.3 Content Priority System

When syncing RUNNING AI messages, the system prioritizes content from most-recent to least-recent sources:

Priority Order (for RUNNING messages):
1. Redis cached_content (updated every 1s via streamingInfo)
2. Existing message content in state (from previous chunks)
3. Backend DB subtask.result.value (updated every 5s)

Selection Strategy: Choose the LONGEST content
// Content priority in buildMessages()
if (!isUserMessage && subtask.status === 'RUNNING') {
  let bestContent = backendContent
  
  // Check existing message content
  if (existingAiMessage && existingAiMessage.content.length > bestContent.length) {
    bestContent = existingAiMessage.content
  }
  
  // Check Redis cached content (highest priority)
  if (streamingInfo && 
      streamingInfo.subtask_id === subtask.id &&
      streamingInfo.cached_content &&
      streamingInfo.cached_content.length > bestContent.length) {
    bestContent = streamingInfo.cached_content
  }
  
  messages.set(messageId, { ...bestContent })
}

4. WebSocket Event Handling

4.1 Event Flow Diagram

┌──────────────────────────────────────────────────────────────────┐
│                     WebSocket Events                              │
├──────────────────────────────────────────────────────────────────┤
│                                                                   │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐          │
│  │ chat:start  │───▶│  Add AI     │───▶│  status:    │          │
│  │ (subtask_id)│    │  message    │    │ streaming   │          │
│  └─────────────┘    └─────────────┘    └─────────────┘          │
│                                                                   │
│  ┌─────────────┐    ┌─────────────┐                              │
│  │ chat:chunk  │───▶│  Append to  │                              │
│  │ (content)   │    │  message.   │                              │
│  └─────────────┘    │  content    │                              │
│                     └─────────────┘                              │
│                                                                   │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐          │
│  │ chat:done   │───▶│  Mark as    │───▶│  status:    │          │
│  │ (result)    │    │  completed  │    │ ready       │          │
│  └─────────────┘    └─────────────┘    └─────────────┘          │
│                                                                   │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐          │
│  │ chat:error  │───▶│  Mark as    │───▶│  status:    │          │
│  │ (error)     │    │  error      │    │ error       │          │
│  └─────────────┘    └─────────────┘    └─────────────┘          │
│                                                                   │
│  ┌─────────────┐    ┌─────────────┐                              │
│  │chat:message │───▶│  Add user   │  (group chat - other users)   │
│  │ (broadcast) │    │  message    │                              │
│  └─────────────┘    └─────────────┘                              │
│                                                                   │
└──────────────────────────────────────────────────────────────────┘

4.2 Critical Handler: CHAT_CHUNK

private handleChatChunkEvent(event: Extract<Event, { type: 'CHAT_CHUNK' }>): void {
  const aiMessageId = generateMessageId('ai', event.subtaskId)

  // CRITICAL: Queue chunks during joining/syncing
  if (this.state.status === 'joining' || this.state.status === 'syncing') {
    this.pendingChunks.push({
      subtaskId: event.subtaskId,
      content: event.content,
      result: event.result,
      sources: event.sources,
      blockId: event.blockId,
    })
    return
  }

  const existingMessage = this.getAiMessage(event.subtaskId)
  if (!existingMessage) {
    console.warn('[TaskStateMachine] CHAT_CHUNK ignored - message not found')
    return
  }

  // CRITICAL FIX: Always append content to message.content
  // Previously, when block_id existed, content was only added to blocks
  // causing UI to lose cached_content after page refresh
  const updatedMessage: UnifiedMessage = {
    ...existingMessage,
    content: existingMessage.content + event.content,
  }

  // Handle reasoning content (DeepSeek R1)
  if (event.result?.reasoning_chunk) {
    updatedMessage.reasoningContent =
      (existingMessage.reasoningContent || '') + event.result.reasoning_chunk
  }

  // Handle blocks (text blocks, tool calls)
  const mergedBlocks = this.mergeBlocks(existingMessage, event)
  if (event.result) {
    updatedMessage.result = {
      ...existingMessage.result,
      ...event.result,
      blocks: mergedBlocks,
    }
  }

  // Update state and notify
  const newMessages = new Map(this.state.messages)
  newMessages.set(aiMessageId, updatedMessage)
  this.state = { ...this.state, messages: newMessages }
  this.notifyListeners()
}

4.3 Pending Chunks Queue

Race condition handling: If a chat:chunk arrives while the machine is still joining or syncing, the chunk is queued and applied after sync completes.

private pendingChunks: PendingChunkEvent[] = []

// Queue chunks during sync
if (this.state.status === 'joining' || this.state.status === 'syncing') {
  this.pendingChunks.push({ subtaskId, content, result, sources, blockId })
  return
}

// Apply after sync completes
private applyPendingChunks(): void {
  for (const chunk of this.pendingChunks) {
    const aiMessageId = generateMessageId('ai', chunk.subtaskId)
    const existingMessage = this.state.messages.get(aiMessageId)
    
    if (!existingMessage) {
      console.warn('[TaskStateMachine] Pending chunk skipped - message not found')
      continue
    }
    
    // Apply chunk to message (same logic as handleChatChunkEvent)
    const updatedMessage = {
      ...existingMessage,
      content: existingMessage.content + chunk.content,
    }
    // ... handle blocks, reasoning, sources
  }
  
  this.pendingChunks = []
  this.notifyListeners()
}

5. Temp to Real Task ID Migration

5.1 The Problem

When a user sends the first message in a new task:

  1. Frontend uses a temporary negative ID (e.g., -1738362000000)
  2. Message sent to backend via WebSocket
  3. Backend creates real task with positive ID (e.g., 1001)
  4. WebSocket events (chat:start, chat:chunk) reference the real ID
  5. Components using the temp ID need to continue working

5.2 Migration Architecture

Step 1: User sends message to new task
┌────────────────────────────────────────────────────────────┐
│  Client uses tempTaskId = -1738362000000                   │
│  ├── Create TaskStateMachine for tempTaskId                │
│  ├── Add user message (status: pending)                    │
│  └── Send chat:message via WebSocket                       │
└────────────────────────────────────────────────────────────┘
                              │
                              ▼
Step 2: Backend responds
┌────────────────────────────────────────────────────────────┐
│  Backend returns realTaskId = 1001                         │
│  ├── Callbacks moved from temp → real                      │
│  └── tempToRealTaskIdRef.set(-1738362000000, 1001)        │
└────────────────────────────────────────────────────────────┘
                              │
                              ▼
Step 3: State Migration
┌────────────────────────────────────────────────────────────┐
│  taskStateManager.migrateState(-1738362000000, 1001)       │
│  ├── machines.set(1001, tempMachine)  // Both point to same│
│  ├── DO NOT delete temp mapping!                           │
│  └── Now both IDs reference same state machine             │
└────────────────────────────────────────────────────────────┘

5.3 Migration Implementation

migrateState(tempTaskId: number, realTaskId: number): void {
  const tempMachine = this.machines.get(tempTaskId)
  if (!tempMachine) {
    console.log('[TaskStateManager] No temp machine to migrate from', tempTaskId)
    return
  }

  // Check if real machine already exists (edge case)
  const existingRealMachine = this.machines.get(realTaskId)
  if (existingRealMachine && existingRealMachine !== tempMachine) {
    console.warn('[TaskStateManager] Real machine already exists')
    tempMachine.leave()
    this.machines.delete(tempTaskId)
    return
  }

  // KEY: Make real task ID point to SAME state machine instance
  // IMPORTANT: Keep BOTH mappings so components using either ID work
  this.machines.set(realTaskId, tempMachine)
  // DO NOT: this.machines.delete(tempTaskId)  // REMOVED!

  const state = tempMachine.getState()
  console.log('[TaskStateManager] Migrated state: both', tempTaskId, 
              'and', realTaskId, 'now point to same machine')
}

5.4 Why Keep Both Mappings?

  • Components: useTaskStateMachine(tempTaskId) continues to work
  • WebSocket Events: chat:chunk with task_id: 1001 finds the machine
  • Streaming Continuity: Active stream not interrupted during migration
  • State Preservation: All pending messages, streaming status preserved

6. Complex Scenarios

6.1 Page Refresh During Streaming

Scenario: User is streaming, refreshes page

T+0:     User has streaming message (content: "Hello worl...")
T+1:     User refreshes page
         └── State machine destroyed, messages lost from memory

T+2:     Page reloads, component mounts
         └── useUnifiedMessages calls recover()

T+3:     Recovery begins
         ├── joinTask(task_id) emitted
         ├── Backend responds with:
         │   ├── streamingInfo: { subtask_id: 50, cached_content: "Hello world!" }
         │   └── subtasks: [...]
         └── JOIN_SUCCESS dispatched

T+4:     doSync(subtasks) called
         ├── Build messages from backend subtasks
         ├── Check streamingInfo exists
         ├── Create AI message with cached_content
         └── Dispatch SYNC_DONE_STREAMING

T+5:     Streaming continues from Redis cache
         ├── New chat:chunk events append to cached content
         └── User sees complete message history

Key Code in doSync():

// CRITICAL: If streamingInfo exists but no streaming message was created,
// create one now. This handles the race condition where:
// 1. User joins room while streaming is in progress
// 2. Backend returns streamingInfo with subtask_id and cached_content
// 3. But subtasks array doesn't contain this subtask yet
const streamingInfo = this.state.streamingInfo
if (streamingInfo && streamingInfo.subtask_id) {
  const aiMessageId = generateMessageId('ai', streamingInfo.subtask_id)
  const existingMessage = this.state.messages.get(aiMessageId)

  if (!existingMessage) {
    const newMessages = new Map(this.state.messages)
    newMessages.set(aiMessageId, {
      id: aiMessageId,
      type: 'ai',
      status: 'streaming',
      content: streamingInfo.cached_content || '',  // Use Redis cache!
      timestamp: Date.now(),
      subtaskId: streamingInfo.subtask_id,
    })
    this.state = { ...this.state, messages: newMessages }
  }
}

6.2 Multiple Tabs with Same Task

Each tab maintains its own TaskStateMachine instance (isolated by browser context):

Tab 1 (Active)              Tab 2 (Background)
    │                           │
    ├── WebSocket connected ───▶│
    │                           │
    ├── chat:send ─────────────▶│ (backend receives once)
    │                           │
    ▼                           ▼
TaskStateMachine #1     TaskStateMachine #2
    │                           │
    ├── chat:start ◀────────────┤ (broadcast to both)
    │                           │
    ├── chat:chunk ◀────────────┤ (broadcast)
    │                           │
    └── chat:done ◀─────────────┤ (broadcast)

Behavior:

  • Each tab joins the task room independently
  • Backend broadcasts events to all joined clients
  • Each tab maintains its own message state
  • If Tab 1 refreshes, it recovers independently

6.3 Network Reconnection Handling

┌─────────────────────────────────────────────────────────────────┐
│              Network Reconnection Flow                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  T+0: User is streaming                                         │
│       ├── status: streaming                                     │
│       └── messages: [...]                                       │
│                                                                 │
│  T+1: Network disconnects                                       │
│       ├── WebSocket closed                                      │
│       └── Socket.io auto-reconnects                             │
│                                                                 │
│  T+2: WebSocket reconnected                                     │
│       ├── reconnect callback triggered                          │
│       └── taskStateManager.recoverAll() called                  │
│           └── For each active task machine.recover({force:true})│
│                                                                 │
│  T+3: Recovery per task                                         │
│       ├── Calculate max messageId from existing messages        │
│       ├── joinTask(taskId, { afterMessageId: maxId })           │
│       ├── Backend returns:                                      │
│       │   ├── streamingInfo (if still streaming)                │
│       │   └── subtasks (only messages after maxId)              │
│       └── Sync applies incremental updates                      │
│                                                                 │
│  T+4: Resume streaming                                          │
│       ├── If streamingInfo exists → create message with cache   │
│       ├── Apply any pending chunks from queue                   │
│       └── Continue receiving new chat:chunk events              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

6.4 Message Ordering Guarantees

Ordering Strategy in useUnifiedMessages:

// Sort messages by messageId (primary) and timestamp (secondary)
const sortedMessages = messages.sort((a, b) => {
  // If both have messageId, use it as primary sort key
  if (a.messageId !== undefined && b.messageId !== undefined) {
    if (a.messageId !== b.messageId) {
      return a.messageId - b.messageId
    }
    // Same messageId, use timestamp as secondary sort key
    return a.timestamp - b.timestamp
  }
  // If only one has messageId, the one with messageId comes first
  if (a.messageId !== undefined) return -1
  if (b.messageId !== undefined) return 1
  // Neither has messageId (both pending), sort by timestamp
  return a.timestamp - b.timestamp
})

Guarantees:

  1. Backend-assigned messageId is primary sort key (strict ordering)
  2. timestamp is secondary sort key (fallback for pending messages)
  3. Messages with messageId always appear before pending messages
  4. Pending messages (user's local input) sorted by timestamp

7. Single Source of Truth Principle

7.1 Data Flow Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    SINGLE SOURCE OF TRUTH                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────────────┐                                       │
│  │  selectedTaskDetail  │                                       │
│  │  (Backend API Data)  │                                       │
│  │  - subtasks[]        │                                       │
│  │  - Stale after fetch │                                       │
│  └──────────────────────┘                                       │
│           │                                                     │
│           │ ONLY used for initial sync                          │
│           ▼                                                     │
│  ┌──────────────────────┐      ┌──────────────────────┐        │
│  │  TaskStateMachine    │◀────▶│  WebSocket Events    │        │
│  │  - messages Map      │      │  - chat:start        │        │
│  │  - streaming status  │      │  - chat:chunk        │        │
│  │  - REAL-TIME state   │      │  - chat:done         │        │
│  └──────────────────────┘      │  - chat:message      │        │
│           │                    └──────────────────────┘        │
│           │                                                     │
│           │ ONLY source for display                             │
│           ▼                                                     │
│  ┌──────────────────────┐                                       │
│  │  useUnifiedMessages  │                                       │
│  │  - Returns messages  │                                       │
│  │  - Sorted & formatted│                                       │
│  └──────────────────────┘                                       │
│           │                                                     │
│           ▼                                                     │
│  ┌──────────────────────┐                                       │
│  │   Chat UI (render)   │                                       │
│  └──────────────────────┘                                       │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

7.2 Critical Rules

Rule 1: NEVER use selectedTaskDetail.subtasks for display

// ❌ WRONG
const messages = selectedTaskDetail.subtasks  // Stale data!

// ✅ CORRECT
const { messages } = useUnifiedMessages({ team, isGroupChat })

Rule 2: TaskStateMachine is updated by WebSocket events ONLY

// WebSocket events update state machine:
handleChatStart(data) {
  const machine = taskStateManager.getOrCreate(data.task_id)
  machine.handleChatStart(data.subtask_id)  // Updates internal state
}

handleChatChunk(data) {
  const machine = taskStateManager.getOrCreate(data.task_id)
  machine.handleChatChunk(data.subtask_id, data.content)
}

Rule 3: Recovery syncs backend state INTO the state machine

// Recovery fetches subtasks and builds messages
private async doSync(subtasks?: TaskDetailSubtask[]): Promise<void> {
  if (subtasks && subtasks.length > 0) {
    this.buildMessages(subtasks)  // Sync into state machine
  }
}

8. Block-Based Message Architecture

Modern AI messages support mixed content (text + tool calls) via blocks:

interface MessageBlock {
  id: string                    // Unique block identifier
  type: 'text' | 'tool' | 'thinking' | 'error'
  content?: string              // Text content
  tool_use_id?: string          // Tool call ID
  tool_name?: string            // Tool name
  tool_input?: Record<string, unknown>
  tool_output?: unknown
  status?: 'pending' | 'streaming' | 'done' | 'error'
  timestamp?: number
}

Block Merging Logic

private mergeBlocks(
  existingMessage: UnifiedMessage,
  event: Extract<Event, { type: 'CHAT_CHUNK' }>
): MessageBlock[] {
  const existingBlocks = existingMessage.result?.blocks || []

  // Case 1: block_id with content - text block update
  if (event.blockId && event.content) {
    const blocksMap = new Map(existingBlocks.map(b => [b.id, b]))
    const targetBlock = blocksMap.get(event.blockId)

    if (targetBlock && targetBlock.type === 'text') {
      // Update existing text block
      const updatedBlock = {
        ...targetBlock,
        content: (targetBlock.content || '') + event.content,
      }
      blocksMap.set(event.blockId, updatedBlock)
    } else if (!targetBlock) {
      // Create new text block
      const newBlock: MessageBlock = {
        id: event.blockId,
        type: 'text',
        content: event.content,
        status: 'streaming',
        timestamp: Date.now(),
      }
      blocksMap.set(event.blockId, newBlock)
    }

    return Array.from(blocksMap.values())
  }

  // Case 2: Tool blocks - merge by block.id (replace strategy)
  const incomingBlocks = event.result?.blocks || []
  if (incomingBlocks.length > 0) {
    const blocksMap = new Map(existingBlocks.map(b => [b.id, b]))
    incomingBlocks.forEach(incomingBlock => {
      blocksMap.set(incomingBlock.id, incomingBlock)  // Replace
    })
    return Array.from(blocksMap.values())
  }

  // Case 3: No changes
  return existingBlocks
}

9. Memory Management and Cleanup

9.1 Cleanup Strategies

// Per-task cleanup
leave(): void {
  this.state = {
    ...this.state,
    status: 'idle',
    messages: new Map(),           // Clear messages
    streamingSubtaskId: null,
    streamingInfo: null,
    error: null,
    isStopping: false,
  }
  this.pendingRecovery = false
  this.pendingChunks = []          // Clear pending chunks
}

// Cleanup via TaskStateManager
cleanup(taskId: number): void {
  const machine = this.machines.get(taskId)
  if (machine) {
    machine.leave()                  // Reset state
    this.machines.delete(taskId)     // Remove reference
  }
}

// Cleanup all
cleanupAll(): void {
  for (const [taskId, machine] of this.machines) {
    machine.leave()
  }
  this.machines.clear()
}

9.2 When Cleanup Happens

  1. Task Switch: User selects different task → cleanup old task
  2. Logout: User logs out → cleanupAll()
  3. Component Unmount: Chat page unmounts → cleanup via useEffect
  4. Explicit Reset: User clicks "Clear chat" → resetStream(taskId)

10. Key Insights and Design Decisions

10.1 Why Event-Driven Architecture?

  • Predictability: All state changes flow through a single dispatch() method
  • Traceability: Easy to log and debug state transitions
  • Testability: Events can be simulated for unit testing
  • Recovery: Recovery is just another event (RECOVER)

10.2 Why Keep Both Temp and Real Task IDs?

  • Component Stability: Components using temp ID don't need to re-mount
  • Event Routing: WebSocket events with real ID find the machine
  • Streaming Continuity: No interruption during migration

10.3 Why Content Priority (Redis > Existing > Backend)?

  • Redis: Updated every 1s during streaming, most recent
  • Existing: Already received via WebSocket chunks
  • Backend DB: Updated every 5s, least recent

10.4 Why Debounce Recovery?

  • Prevents cascading recoveries during rapid reconnections
  • 1s default provides balance between responsiveness and stability
  • force: true bypasses debounce for manual recovery

11. Summary

The TaskStateMachine architecture ensures message consistency through:

  1. Centralized State Management: Single TaskStateMachine per task with global manager
  2. Event-Driven Design: All state changes via typed events (RECOVER, CHAT_START, CHAT_CHUNK, etc.)
  3. Content Priority System: Redis cache > existing > backend for RUNNING messages
  4. Pending Chunks Queue: Handles race conditions between sync and incoming chunks
  5. Temp→Real ID Migration: Both IDs point to same machine, ensuring continuity
  6. Incremental Sync: afterMessageId reduces bandwidth on reconnect
  7. Single Source of Truth: useUnifiedMessages returns ONLY state machine messages

Result: Robust message handling that survives page refreshes, network interruptions, and complex multi-tab scenarios while maintaining strict ordering guarantees.