Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Wegent Execution Layer Architecture Analysis

Wegent Execution Layer Architecture Analysis

Table of Contents

  1. Executive Summary
  2. Architecture Overview
  3. Executor Manager (executor_manager/)
  4. Executor (executor/)
  5. Communication Flow
  6. Container Lifecycle
  7. Agent Types
  8. Docker Orchestration
  9. Complex Scheduling Logic

Executive Summary

The Wegent Execution Layer is a distributed task execution system built on Docker containerization. It consists of two primary components:

  • Executor Manager: Handles task scheduling, container orchestration, and resource management
  • Executor: Runs inside containers to execute AI agent tasks (ClaudeCode, Agno, Dify, etc.)

Key capabilities include:

  • Multi-mode task dispatch (pull vs push)
  • Session persistence for conversational AI
  • Custom base image support with validation
  • Distributed tracing with OpenTelemetry
  • Heartbeat-based crash detection (OOM, etc.)
  • GitHub App integration for MCP servers

Architecture Overview


Executor Manager

1. Task Scheduling and Orchestration

The Executor Manager supports two dispatch modes:

Pull Mode (Default)

Key Components:

  • scheduler/scheduler.py: TaskScheduler class with APScheduler
  • Fetches tasks periodically (default: every 10 seconds)
  • Separate scheduling for online, offline, and subtasks
  • Max concurrent tasks control (default: 30 online, 10 offline)

Configuration:

TASK_FETCH_INTERVAL = int(os.getenv("TASK_FETCH_INTERVAL", "10"))
MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "30"))
MAX_OFFLINE_CONCURRENT_TASKS = int(os.getenv("MAX_OFFLINE_CONCURRENT_TASKS", "10"))

Push Mode

Components:

  • services/task_queue_service.py: Redis-based FIFO queue
  • services/task_queue_consumer.py: Async task consumers
  • Separate queues for online/offline tasks with different service pools

2. Container Lifecycle Management

DockerExecutor (executors/docker/executor.py) manages:

OperationMethodDescription
Createsubmit_executor()Creates new container with task config
Reuse_execute_in_existing_container()Sends task to running container
Deletedelete_executor()Removes container by name
Cancelcancel_task()Calls executor's cancel API
Statusget_container_status()Checks running/exited/OOM

Container Labels (for tracking):

labels = {
    "owner": "executor_manager",
    "task_id": task_id,
    "subtask_id": subtask_id,
    "user": user_name,
    "aigc.weibo.com/team-mode": mode,
    "aigc.weibo.com/task-type": task_type,
    "subtask_next_id": next_id,
}

3. Port Allocation and Resource Management

Port Allocation:

def find_available_port() -> int:
    """Find random available port between 10000-65535"""
    # Uses Python socket to test availability
    # Returns first available port

Resource Controls:

  • Concurrent task limits per type (online/offline)
  • Slot-based task fetching (up to 10 per cycle)
  • Container resource constraints via Docker flags
  • Kernel compatibility checks (seccomp for < 4.0)

Seccomp Handling:

def _should_enable_seccomp() -> bool:
    # Disable for kernels < 4.0 (CentOS 7 compatibility)
    # Fixes EPERM errors with Bun/Node.js
    if major_version < 4:
        return False  # Add --security-opt seccomp=unconfined

4. Callback Handling

Callback Flow:

Retry Logic:

  • Max retries: 10
  • Initial delay: 1 second
  • Backoff multiplier: 2x
  • Timeout: 10 seconds per request

Trace Propagation:

  • W3C Trace Context headers (traceparent/tracestate)
  • X-Request-ID for log correlation
  • Distributed tracing across components

Executor

1. Agent Types

The Executor supports multiple AI agent implementations through a factory pattern:

ClaudeCode Agent

Type: local_engine

Characteristics:

  • Uses Claude Code SDK for local code execution
  • Session persistence via _clients dictionary (key: task_id:bot_id)
  • Supports conversation continuation within same task
  • Pipeline mode support with new_session flag
  • MCP server integration

Session Management:

# Session key format: "task_id:bot_id"
_internal_session_key = f"{self.task_id}:{bot_id}"

# Cases:
# 1. No cache -> use internal_session_key
# 2. Has cache + new_session=True -> create new session (subtask_id)
# 3. Has cache + new_session=False -> reuse cached session

Key Features:

  • Git repository cloning with token decryption
  • Custom instructions from .cursorrules, .windsurfrules
  • Claude.md symlink from Agents.md
  • Progress state management with thinking steps
  • Coordinate mode with SubAgent configuration

Agno Agent

Type: local_engine

Characteristics:

  • Multi-agent team framework
  • SQLite session persistence (/tmp/agno_data.db)
  • Team modes: coordinate, collaborate, route
  • Streaming and non-streaming execution
  • DeepSeek R1 reasoning content support

Team Modes:

  • Coordinate: Manager agent coordinates multiple workers
  • Collaborate: Agents work together on same task
  • Route: Router directs to specialized agents

Streaming Events:

  • RunEvent.run_started/completed: Agent lifecycle
  • RunEvent.tool_call_started/completed: Tool usage
  • RunEvent.run_content: Generated content
  • TeamRunEvent.team_reasoning_step: Team reasoning

Dify Agent

Type: external_api

Characteristics:

  • No local code execution (delegates to Dify cloud)
  • Supports all Dify app modes: chat, chatflow, workflow, agent-chat, completion
  • Conversation ID tracking for multi-turn
  • Task ID storage for cancellation
  • Streaming response processing

App Mode Routing:

if app_mode == "workflow":
    return self._call_workflow_api(query)
else:
    return self._call_chat_api(query)  # chat, chatflow, agent-chat

ImageValidator Agent

Type: validator

Characteristics:

  • Validates custom base images for shell compatibility
  • Checks required dependencies (Node.js, Python, etc.)
  • Version validation against minimum requirements
  • Short-lived validation tasks

Validation Checks:

VALIDATION_CHECKS = {
    "ClaudeCode": [
        {"name": "node", "min_version": "20.0.0"},
        {"name": "claude-code"},
        {"name": "python", "min_version": "3.12.0"},
    ],
    "Agno": [
        {"name": "python", "min_version": "3.12.0"},
        {"name": "sqlite", "min_version": "3.50.0"},
    ],
}

2. Task Execution Flow

3. Container Isolation Mechanisms

Docker Security:

docker run \
  --init \\\n  --label owner=executor_manager \\\n  --label task_id={id} \\\n  -e TASK_INFO={json} \\\n  -e CALLBACK_URL={url} \\\n  -e HEARTBEAT_ID={id} \\\n  -e HEARTBEAT_TYPE=task \\\n  -e OTEL_ENABLED=true \\\n  -e TRACEPARENT={trace_id} \\\n  -p {port}:{port} \\\n  -v /var/run/docker.sock:/var/run/docker.sock \\\n  -v {workspace}:/workspace \\\n  {image}

Isolation Features:

  • Container labels for ownership verification
  • Environment variable injection for configuration
  • Port isolation (random assignment)
  • Workspace volume mounting
  • Docker socket access (for container management)

Custom Base Image Support:

4. Result Reporting

Callback Chain:

  1. Task Started: send_task_started_callback()
  2. Progress Updates: report_progress() during execution
  3. Task Completed: send_task_completed_callback()
  4. Task Failed: send_task_failed_callback()

ExecutionResult Structure:

ExecutionResult(
    value=str,              # Final result content
    thinking=List[ThinkingStep],  # Reasoning steps
    reasoning_content=str,  # DeepSeek R1 reasoning
    silent_exit=bool,       # Early termination flag
)

Communication Flow

1. Component Communication

2. Data Flow

Task Dispatch (Pull Mode):

Backend API → TaskScheduler → ExecutorDispatcher → DockerExecutor → Docker Daemon → Container

Task Dispatch (Push Mode):

Backend API → TaskQueueService (Redis) → TaskQueueConsumer → ExecutorDispatcher → Container

Status Updates:

Container → Executor → CallbackClient → Executor Manager → Backend API

Heartbeat Flow:

Container → HeartbeatService → Redis → TaskHeartbeatManager → Backend API (on timeout)

3. API Endpoints

Executor Manager → Backend:

  • GET /api/tasks/fetch - Fetch tasks
  • POST /api/tasks/status - Update task status
  • POST /api/shells/validation-status/{id} - Update validation status

Executor → Executor Manager:

  • POST /api/tasks/execute - Execute task
  • POST /api/tasks/cancel - Cancel task
  • DELETE /api/tasks/session - Delete session
  • GET /api/tasks/sessions - List sessions

Executor (Inside Container):

  • GET / - Health check
  • POST /api/tasks/execute - Main execution endpoint
  • POST /api/tasks/cancel - Cancel running task

Container Lifecycle

Lifecycle Stages:

  1. Creation

    • Docker command preparation with all environment variables
    • Port allocation and volume mounting
    • Label assignment for tracking
    • Container start with health check
  2. Execution

    • FastAPI server startup
    • TASK_INFO parsing
    • Agent initialization and configuration
    • Pre-execution (code download, env setup)
    • Async task execution
  3. Monitoring

    • Heartbeat service registration
    • Progress callbacks
    • Cancellation checks
    • Resource monitoring
  4. Termination

    • Final callback with result
    • Session cleanup (for persistent agents)
    • Container removal (or keep for reuse)

Docker Orchestration

1. Container Management

ExecutorDispatcher Pattern:

class ExecutorDispatcher:
    """Dynamically loads executor implementations"""
    
    _executors = {
        "docker": DockerExecutor(),
        # Extensible for k8s, nomad, etc.
    }
    
    @classmethod
    def get_executor(cls, task_type: str) -> Executor:
        return cls._executors.get(task_type, cls._executors["docker"])

DockerExecutor Key Methods:

MethodPurpose
submit_executor()Main entry - creates or reuses container
_create_new_container()Full container lifecycle creation
_execute_in_existing_container()Send task to running container
get_container_status()Check OOM, exit codes
cancel_task()Graceful cancellation
delete_executor()Force cleanup

2. Resource Management

Port Allocation Strategy:

def find_available_port(start=10000, end=65535) -> int:
    while True:
        port = random.randint(start, end)
        if is_port_available(port):
            return port

Concurrent Task Control:

# Scheduler-level control
available_slots = min(10, max_concurrent - running)
if available_slots <= 0:
    skip_fetch()

# Global limits
MAX_CONCURRENT_TASKS = 30        # Online tasks
MAX_OFFLINE_CONCURRENT_TASKS = 10 # Offline tasks

3. Health Checking

Container Health Monitoring:

def _check_container_health(task, executor_name, is_validation_task):
    time.sleep(2)  # Wait for startup
    status = get_container_status(executor_name)
    
    if status == "exited":
        logs = get_container_logs(executor_name)
        exit_code = get_exit_code(executor_name)
        error_msg = analyze_failure(logs, exit_code)
        raise RuntimeError(error_msg)

Failure Analysis:

  • Exit code 127: Command not found
  • Exit code 126: Permission denied
  • "no such file or directory": Binary incompatibility (Alpine vs glibc)
  • "libc" or "ld-linux" missing: Missing C library

4. Cleanup and Garbage Collection

SandboxManager GC Task:

async def start_gc_task():
    """Periodic cleanup of orphaned containers"""
    # Scans for containers not in active_tasks
    # Removes containers idle > timeout
    # Updates task status for cleaned containers

Cleanup Triggers:

  • Task completion callback received
  • Heartbeat timeout detected
  • Manual task cancellation
  • Container health check failure

Complex Scheduling Logic

1. Multi-Mode Dispatch

Pull Mode Complexity:

  • APScheduler with multiple job types
  • Cron triggers for offline tasks (21:00-08:00)
  • Slot-based concurrency control
  • Task type filtering (online/offline/pipeline)

Push Mode Complexity:

  • Redis List for FIFO queuing
  • Service pool isolation (default/canary)
  • Retry mechanism with backoff
  • Dead letter queue for failed tasks

2. Session Persistence Logic

ClaudeCode Session Management:

# Session reuse strategy
if session_id in _clients:
    client = _clients[session_id]
    if client.process.poll() is None:
        reuse_client()
    else:
        remove_from_cache()
        create_new_client()

Pipeline Mode Complexity:

  • Each bot gets unique session (task_id:bot_id)
  • new_session=True creates subtask-based session
  • Previous sessions kept for potential jump-back
  • Session ID mapping maintained in _session_id_map

3. Heartbeat-Based Crash Detection

Architecture:

Implementation:

  • Redis Sorted Set: heartbeats:{task_id} with timestamp score
  • Timeout threshold: 60 seconds default
  • OOM detection via container status check
  • Automatic task failure notification

4. Task Cancellation

Cancellation Flow:

Complexity:

  • Different cancellation per agent type
  • Dify: API call to stop task
  • ClaudeCode: Client disconnection
  • Agno: Run cancellation via cancel_run()
  • State machine to prevent race conditions

5. Custom Base Image Validation

Validation Flow:

Stages:

  1. pulling_image - Docker pull
  2. starting_container - Container creation
  3. running_checks - Dependency validation
  4. completed - Success or failure

Summary

The Wegent Execution Layer provides a robust, scalable task execution system with:

  1. Flexible Deployment: Pull vs push modes, service pools, canary support
  2. Session Management: Persistent AI conversations across multiple turns
  3. Reliability: Heartbeat monitoring, crash detection, automatic retries
  4. Observability: OpenTelemetry tracing, detailed logging, progress tracking
  5. Security: Container isolation, token encryption, GitHub App integration
  6. Extensibility: Pluggable executor types, agent factory pattern, custom images

Key areas requiring careful attention:

  • Session persistence logic in ClaudeCodeAgent (complex caching strategy)
  • Heartbeat coordination between Executor and Executor Manager
  • Pipeline mode with multiple bots and session management
  • Custom base image binary compatibility (glibc vs musl)