Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Deep Dive: Pipeline Collaboration Mode - Stage State Machine

Deep Dive: Pipeline Collaboration Mode - Stage State Machine

Executive Summary

The Pipeline collaboration mode is a sophisticated human-in-the-loop workflow system that enables multi-stage AI agent execution with optional user confirmation checkpoints. It transforms linear agent execution into a state-driven pipeline where each stage (bot) can optionally pause for human review before proceeding to the next stage.

This document provides a comprehensive analysis of the pipeline stage state machine, including architecture, state transitions, WebSocket event flows, and integration with executors.


1. Pipeline Stage Architecture

1.1 Core Concepts

ComponentDescriptionStorage Location
StageA single step in the pipeline, mapped 1:1 with a Team member (bot)Team.spec.members[]
requireConfirmationBoolean flag that enables the checkpoint mechanismTeamMember.requireConfirmation
Stage StateCurrent status of a stage executionSubtask.status field
Stage InfoAggregated view of all stages for UI displayComputed via PipelineStageService

1.2 Data Model

# backend/app/schemas/kind.py:333
class TeamMember(BaseModel):
    botRef: ResourceRef              # Reference to the Bot CRD
    role: Optional[str] = "member"   # "leader" or "member"
    botPrompt: Optional[str] = None  # Stage-specific prompt override
    requireConfirmation: Optional[bool] = False  # Enable checkpoint

Key insight: The requireConfirmation flag is defined at the Team configuration level, not in the Bot itself. This allows the same Bot to participate in different pipelines with different checkpoint requirements.

1.3 Stage Storage and Mapping

Stages are not stored as separate database entities. Instead, they are dynamically computed from:

  1. Team members array (team_crd.spec.members) - defines the pipeline structure
  2. Subtask records - track execution state per stage
  3. Bot-to-stage mapping - determined by matching subtask.bot_ids to team members
# backend/app/services/adapters/pipeline_stage.py:216-219
# Build a mapping from bot name to stage index
bot_name_to_stage: Dict[str, int] = {}
for i, member in enumerate(members):
    bot_name_to_stage[member.botRef.name] = i

2. State Machine Implementation

2.1 State Diagram

2.2 State Definitions

StateDatabase ValueUI DisplayDescription
PendingSubtaskStatus.PENDING"Waiting"Subtask created but not yet executing
RunningSubtaskStatus.RUNNING"In Progress"Bot actively processing
CompletedSubtaskStatus.COMPLETED"Done"Stage finished successfully
Pending ConfirmationSubtaskStatus.PENDING_CONFIRMATION"Awaiting Review"Paused for human review
FailedSubtaskStatus.FAILED"Failed"Stage execution error

2.3 State Transition Logic

The core state transition logic resides in PipelineStageService.get_stage_info():

# backend/app/services/adapters/pipeline_stage.py:262-293
for i in range(total_stages):
    if i in stage_subtask_map:
        subtask = stage_subtask_map[i]
        if subtask.status == SubtaskStatus.PENDING_CONFIRMATION:
            current_stage = i
            is_pending_confirmation = True
            break
        elif subtask.status in [SubtaskStatus.RUNNING, SubtaskStatus.PENDING]:
            current_stage = i
            break
        elif subtask.status == SubtaskStatus.COMPLETED:
            # Check if this completed stage has requireConfirmation
            if i < len(members) and members[i].requireConfirmation:
                next_stage_idx = i + 1
                if next_stage_idx not in stage_subtask_map:
                    current_stage = i
                    is_pending_confirmation = True
                    break
            current_stage = i + 1

Key behaviors:

  1. If a stage is PENDING_CONFIRMATION, the pipeline stays at that stage
  2. If a stage has requireConfirmation=true and the next stage hasn't started, mark as awaiting confirmation
  3. Completed stages without confirmation requirements automatically advance

3. WebSocket Event Flow for Confirmation

3.1 Event Architecture

The confirmation flow uses a hybrid approach:

  • WebSocket: Real-time streaming of bot responses
  • REST API: Confirmation action (continue/retry)

3.2 Confirmation Flow Sequence

3.3 Frontend Confirmation Components

ComponentFilePurpose
PipelineStageIndicatorPipelineStageIndicator.tsxVisual progress bar showing all stages
FinalPromptMessageFinalPromptMessage.tsxDisplays stage output with Confirm/Edit actions
ChatAreaChatArea.tsxOrchestrates stage state between components

3.4 API Endpoints

# backend/app/api/endpoints/adapter/tasks.py

# Get current stage information (polling-based)
GET /tasks/{task_id}/pipeline-stage-info
Response: {
    "current_stage": 0,
    "total_stages": 3,
    "current_stage_name": "CodeReviewer",
    "is_pending_confirmation": true,
    "stages": [
        {"index": 0, "name": "Architect", "status": "completed", "require_confirmation": false},
        {"index": 1, "name": "CodeReviewer", "status": "pending_confirmation", "require_confirmation": true},
        {"index": 2, "name": "TestWriter", "status": "pending", "require_confirmation": false}
    ]
}

# Confirm stage and proceed
POST /tasks/{task_id}/confirm-pipeline-stage
Body: {
    "confirmed_prompt": "string",  # Can be edited by user
    "action": "continue" | "retry"
}

4. Integration with Executors

4.1 Subtask Creation Strategy

The pipeline uses a sequential subtask creation pattern:

# backend/app/services/adapters/task_kinds/helpers.py:157-224
def _create_pipeline_subtask(...):
    # Determine which stage to create subtask for
    should_stay, current_stage_index = pipeline_stage_service.should_stay_at_current_stage(...)
    
    if should_stay and current_stage_index is not None:
        target_stage_index = current_stage_index  # Stay for confirmation
    elif existing_subtasks and current_stage_index is not None:
        target_stage_index = current_stage_index  # Follow-up in same stage
    else:
        target_stage_index = 0  # Start from beginning
    
    # Get the target bot for the determined stage
    target_member = team_crd.spec.members[target_stage_index]
    # Create single subtask for this bot only

Key insight: Unlike other modes that create all subtasks upfront, pipeline mode creates subtasks one at a time as stages complete. This enables:

  • Dynamic stage progression
  • Stage-specific context passing
  • Human intervention between stages

4.2 Agno Executor Integration

Agno handles pipeline mode through team collaboration modes:

# executor/agents/agno/team_builder.py:200-230
def _get_mode_config(self, mode: str) -> Dict[str, Any]:
    if mode == "coordinate":
        # Coordination: Leader splits tasks → selective assignment → summary
        return {"reasoning": True}
    elif mode == "collaborate":
        # Collaboration: All members work in parallel, leader summarizes
        return {"delegate_task_to_all_members": True, "reasoning": True}
    elif mode == "route":
        # Routing: Select only the most suitable member
        return {"respond_directly": True}

Note: In the current implementation, Agno executes pipeline stages as separate invocations rather than using Agno's built-in team modes. Each stage creates a new subtask with its own bot configuration.

4.3 Claude Code Executor Integration (SubAgent Pattern)

Claude Code uses a sophisticated SubAgent pattern for pipeline execution:

# executor/agents/claude_code/claude_code_agent.py:2011-2069
def _setup_coordinate_mode(self) -> None:
    """Setup SubAgent configuration files for coordinate mode."""
    bots = self.task_data.get("bot", [])
    mode = self.task_data.get("mode")
    
    # Only setup for coordinate mode with multiple bots
    if mode != "coordinate" or len(bots) <= 1:
        return
    
    # Leader is bot[0], members are bot[1:]
    member_bots = bots[1:]
    
    # Create .claude/agents directory
    agents_dir = os.path.join(target_path, ".claude", "agents")
    os.makedirs(agents_dir, exist_ok=True)
    
    # Generate SubAgent config file for each member
    for bot in member_bots:
        self._generate_subagent_file(agents_dir, bot)

SubAgent Configuration File (generated for each stage):

---
name: code-reviewer-agent
model: inherit
description: "Handle tasks related to Code Reviewer"
---

<system_prompt_content>

Key capabilities:

  • Each bot runs in the same Claude Code session for context continuity
  • SubAgent files enable Claude Code to invoke subsequent stages as tools
  • The model: inherit setting ensures consistent model usage across stages

4.4 Stage Context Passing

When a stage completes and requires confirmation, the context is passed via the result field:

# backend/app/services/adapters/pipeline_stage.py:422-428
# Store the confirmed prompt in the next subtask's result field
next_subtask.result = {
    "confirmed_prompt": confirmed_prompt,
    "from_stage_confirmation": True,
}

This context is then retrieved by the executor when processing the next stage:

  • Agno: Receives context through task data
  • Claude Code: Maintains context through shared session and SubAgent invocation

5. Complex Scenarios

5.1 Stage Failure Handling

When a stage fails, the pipeline stops at the failed stage:

# backend/app/services/adapters/pipeline_stage.py:287-289
elif subtask.status == SubtaskStatus.FAILED:
    current_stage = i
    break  # Stop at failed stage

Recovery options:

  1. Retry: User can retry the failed stage (resets subtask to PENDING)
  2. Edit + Retry: User can modify the prompt before retrying
  3. Abort: Task remains in FAILED state

5.2 Mid-Pipeline User Intervention

Users can intervene at any requireConfirmation checkpoint:

ActionBehavior
ContinueProceeds to next stage with original output
Edit + ContinueUser modifies output, then proceeds
RetryRe-runs current stage with original input
Edit + RetryRe-runs current stage with modified input

Implementation: The action parameter in the confirm API determines behavior:

# backend/app/services/adapters/pipeline_stage.py:367-383
if action == "retry":
    # Stay at current stage - update status to COMPLETED so user can send new messages
    task_crd.status.status = "COMPLETED"
else:
    # action == "continue": Proceed to next stage
    # Create next stage subtask with confirmed prompt

5.3 Parallel Stage Execution

Current implementation: Pipeline stages execute sequentially, not in parallel. Each stage must complete (and be confirmed if required) before the next stage begins.

Rationale:

  • Pipeline mode is designed for dependent stages where each stage refines the previous output
  • Parallel execution is handled by other collaboration modes (coordinate, collaborate)

6. Frontend Stage Display

6.1 PipelineStageIndicator Component

The visual progress indicator shows:

  • Start node: Entry point of the pipeline
  • Stage nodes: Each bot in the team
  • Connector lines: Progress flow between stages
  • Status icons: Visual state representation
// frontend/src/features/tasks/components/chat/PipelineStageIndicator.tsx:102-122
const getStageIcon = (stage: DisplayStage, isCurrentStage: boolean) => {
  switch (stage.status) {
    case 'completed':
      return <CheckCircle2 className="h-4 w-4 text-green-500" />
    case 'running':
      return <Loader2 className="h-4 w-4 text-primary animate-spin" />
    case 'pending_confirmation':
      return <Clock className="h-4 w-4 text-amber-500" />
    case 'failed':
      return <XCircle className="h-4 w-4 text-red-500" />
    default:
      return <Circle className={cn('h-4 w-4', isCurrentStage ? 'text-primary' : 'text-text-muted')} />
  }
}

6.2 FinalPromptMessage Component

When a stage with requireConfirmation=true completes, the FinalPromptMessage component displays:

  • The stage output (final prompt)
  • Edit button: Allows modifying the output
  • Confirm button: Proceeds to next stage
  • Copy and Create Task actions (when not in pipeline confirmation)
// frontend/src/features/tasks/components/message/FinalPromptMessage.tsx:239-258
{showPipelineActions ? (
  <Button
    variant="default"
    onClick={handleContinueToNextStage}
    disabled={isConfirming || hasConfirmed}
  >
    {hasConfirmed ? t('pipeline.stage_confirming') : t('pipeline.confirm_stage')}
  </Button>
) : (
  <Button variant="secondary" onClick={handleCreateTask}>
    {t('clarification.create_task')}
  </Button>
)}

7. Code Review Workflow Enablement

The pipeline stage system uniquely enables code review workflows through:

7.1 Checkpoint-Based Review

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│  Architect Bot  │ --> │ Code Reviewer   │ --> │  Test Writer    │
│  (Design phase) │     │ (Review phase)  │     │ (Testing phase) │
│                 │     │                 │     │                 │
│  requireConfirm │     │  requireConfirm │     │  requireConfirm │
│     = false     │     │     = true      │     │     = false     │
└─────────────────┘     └─────────────────┘     └─────────────────┘
        |                       |                       |
        |                       v                       |
        |               ┌───────────────┐               |
        |               │  Human Review │               |
        |               │  - Check code │               |
        |               │  - Edit output│               |
        |               │  - Approve    │               |
        |               └───────────────┘               |
        |                       |                       |
        v                       v                       v

7.2 Workflow Benefits

BenefitImplementation
Quality GatesEach requireConfirmation acts as a quality checkpoint
Context PreservationPrevious stage output is preserved and editable
Audit TrailAll stages and decisions are logged in subtasks
Flexible InterventionUsers can edit prompts at any checkpoint
Rollback SupportRetry action allows re-running stages

7.3 Real-World Example: Code Review Pipeline

# Team configuration for code review workflow
spec:
  collaborationModel: pipeline
  members:
    - botRef: {name: architect, namespace: default}
      role: leader
      requireConfirmation: false  # Auto-generate design
    - botRef: {name: code-reviewer, namespace: default}
      role: member
      requireConfirmation: true   # Pause for human review
    - botRef: {name: test-writer, namespace: default}
      role: member
      requireConfirmation: false  # Auto-generate tests

Execution flow:

  1. Architect generates code design (auto-completes)
  2. Code Reviewer reviews and suggests improvements (pauses for human approval)
  3. Human reviews the output, can edit, then confirms to proceed
  4. Test Writer generates tests based on approved code (auto-completes)

8. Key Implementation Files

FilePurposeLines
backend/app/services/adapters/pipeline_stage.pyCore stage state machine632
backend/app/services/adapters/executor_kinds.pyExecutor integration, confirmation check1990+
backend/app/services/adapters/task_kinds/helpers.pySubtask creation for pipeline250+
backend/app/api/endpoints/adapter/tasks.pyREST API for confirmation380+
frontend/src/features/tasks/components/chat/PipelineStageIndicator.tsxVisual progress indicator282
frontend/src/features/tasks/components/message/FinalPromptMessage.tsxConfirmation UI270
executor/agents/claude_code/claude_code_agent.pySubAgent pattern implementation2129

Summary

The Pipeline collaboration mode's stage state machine is a production-grade human-in-the-loop workflow system that:

  1. Enables structured multi-stage execution with clear progression through team members
  2. Provides optional confirmation checkpoints via the requireConfirmation flag
  3. Supports human intervention at any checkpoint with edit and retry capabilities
  4. Maintains context continuity between stages through careful subtask design
  5. Integrates seamlessly with both Agno and Claude Code executors
  6. Enables code review workflows through checkpoint-based quality gates

The unique approach combines:

  • State-driven architecture using subtask status transitions
  • REST-based confirmation actions separate from WebSocket streaming
  • Sequential stage execution with optional human checkpoints
  • Flexible context passing between stages

This system transforms AI agent collaboration from a "black box" execution into a transparent, reviewable, and controllable workflow suitable for production software engineering tasks.