Wegent Execution Layer Architecture Analysis
Table of Contents
- Executive Summary
- Architecture Overview
- Executor Manager (executor_manager/)
- Executor (executor/)
- Communication Flow
- Container Lifecycle
- Agent Types
- Docker Orchestration
- Complex Scheduling Logic
Executive Summary
The Wegent Execution Layer is a distributed task execution system built on Docker containerization. It consists of two primary components:
- Executor Manager: Handles task scheduling, container orchestration, and resource management
- Executor: Runs inside containers to execute AI agent tasks (ClaudeCode, Agno, Dify, etc.)
Key capabilities include:
- Multi-mode task dispatch (pull vs push)
- Session persistence for conversational AI
- Custom base image support with validation
- Distributed tracing with OpenTelemetry
- Heartbeat-based crash detection (OOM, etc.)
- GitHub App integration for MCP servers
Architecture Overview
Executor Manager
1. Task Scheduling and Orchestration
The Executor Manager supports two dispatch modes:
Pull Mode (Default)
Key Components:
scheduler/scheduler.py: TaskScheduler class with APScheduler- Fetches tasks periodically (default: every 10 seconds)
- Separate scheduling for online, offline, and subtasks
- Max concurrent tasks control (default: 30 online, 10 offline)
Configuration:
TASK_FETCH_INTERVAL = int(os.getenv("TASK_FETCH_INTERVAL", "10"))
MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "30"))
MAX_OFFLINE_CONCURRENT_TASKS = int(os.getenv("MAX_OFFLINE_CONCURRENT_TASKS", "10"))
Push Mode
Components:
services/task_queue_service.py: Redis-based FIFO queueservices/task_queue_consumer.py: Async task consumers- Separate queues for online/offline tasks with different service pools
2. Container Lifecycle Management
DockerExecutor (executors/docker/executor.py) manages:
| Operation | Method | Description |
|---|---|---|
| Create | submit_executor() | Creates new container with task config |
| Reuse | _execute_in_existing_container() | Sends task to running container |
| Delete | delete_executor() | Removes container by name |
| Cancel | cancel_task() | Calls executor's cancel API |
| Status | get_container_status() | Checks running/exited/OOM |
Container Labels (for tracking):
labels = {
"owner": "executor_manager",
"task_id": task_id,
"subtask_id": subtask_id,
"user": user_name,
"aigc.weibo.com/team-mode": mode,
"aigc.weibo.com/task-type": task_type,
"subtask_next_id": next_id,
}
3. Port Allocation and Resource Management
Port Allocation:
def find_available_port() -> int:
"""Find random available port between 10000-65535"""
# Uses Python socket to test availability
# Returns first available port
Resource Controls:
- Concurrent task limits per type (online/offline)
- Slot-based task fetching (up to 10 per cycle)
- Container resource constraints via Docker flags
- Kernel compatibility checks (seccomp for < 4.0)
Seccomp Handling:
def _should_enable_seccomp() -> bool:
# Disable for kernels < 4.0 (CentOS 7 compatibility)
# Fixes EPERM errors with Bun/Node.js
if major_version < 4:
return False # Add --security-opt seccomp=unconfined
4. Callback Handling
Callback Flow:
Retry Logic:
- Max retries: 10
- Initial delay: 1 second
- Backoff multiplier: 2x
- Timeout: 10 seconds per request
Trace Propagation:
- W3C Trace Context headers (traceparent/tracestate)
- X-Request-ID for log correlation
- Distributed tracing across components
Executor
1. Agent Types
The Executor supports multiple AI agent implementations through a factory pattern:
ClaudeCode Agent
Type: local_engine
Characteristics:
- Uses Claude Code SDK for local code execution
- Session persistence via
_clientsdictionary (key:task_id:bot_id) - Supports conversation continuation within same task
- Pipeline mode support with
new_sessionflag - MCP server integration
Session Management:
# Session key format: "task_id:bot_id"
_internal_session_key = f"{self.task_id}:{bot_id}"
# Cases:
# 1. No cache -> use internal_session_key
# 2. Has cache + new_session=True -> create new session (subtask_id)
# 3. Has cache + new_session=False -> reuse cached session
Key Features:
- Git repository cloning with token decryption
- Custom instructions from
.cursorrules,.windsurfrules - Claude.md symlink from Agents.md
- Progress state management with thinking steps
- Coordinate mode with SubAgent configuration
Agno Agent
Type: local_engine
Characteristics:
- Multi-agent team framework
- SQLite session persistence (
/tmp/agno_data.db) - Team modes:
coordinate,collaborate,route - Streaming and non-streaming execution
- DeepSeek R1 reasoning content support
Team Modes:
- Coordinate: Manager agent coordinates multiple workers
- Collaborate: Agents work together on same task
- Route: Router directs to specialized agents
Streaming Events:
RunEvent.run_started/completed: Agent lifecycleRunEvent.tool_call_started/completed: Tool usageRunEvent.run_content: Generated contentTeamRunEvent.team_reasoning_step: Team reasoning
Dify Agent
Type: external_api
Characteristics:
- No local code execution (delegates to Dify cloud)
- Supports all Dify app modes: chat, chatflow, workflow, agent-chat, completion
- Conversation ID tracking for multi-turn
- Task ID storage for cancellation
- Streaming response processing
App Mode Routing:
if app_mode == "workflow":
return self._call_workflow_api(query)
else:
return self._call_chat_api(query) # chat, chatflow, agent-chat
ImageValidator Agent
Type: validator
Characteristics:
- Validates custom base images for shell compatibility
- Checks required dependencies (Node.js, Python, etc.)
- Version validation against minimum requirements
- Short-lived validation tasks
Validation Checks:
VALIDATION_CHECKS = {
"ClaudeCode": [
{"name": "node", "min_version": "20.0.0"},
{"name": "claude-code"},
{"name": "python", "min_version": "3.12.0"},
],
"Agno": [
{"name": "python", "min_version": "3.12.0"},
{"name": "sqlite", "min_version": "3.50.0"},
],
}
2. Task Execution Flow
3. Container Isolation Mechanisms
Docker Security:
docker run \
--init \\\n --label owner=executor_manager \\\n --label task_id={id} \\\n -e TASK_INFO={json} \\\n -e CALLBACK_URL={url} \\\n -e HEARTBEAT_ID={id} \\\n -e HEARTBEAT_TYPE=task \\\n -e OTEL_ENABLED=true \\\n -e TRACEPARENT={trace_id} \\\n -p {port}:{port} \\\n -v /var/run/docker.sock:/var/run/docker.sock \\\n -v {workspace}:/workspace \\\n {image}
Isolation Features:
- Container labels for ownership verification
- Environment variable injection for configuration
- Port isolation (random assignment)
- Workspace volume mounting
- Docker socket access (for container management)
Custom Base Image Support:
4. Result Reporting
Callback Chain:
- Task Started:
send_task_started_callback() - Progress Updates:
report_progress()during execution - Task Completed:
send_task_completed_callback() - Task Failed:
send_task_failed_callback()
ExecutionResult Structure:
ExecutionResult(
value=str, # Final result content
thinking=List[ThinkingStep], # Reasoning steps
reasoning_content=str, # DeepSeek R1 reasoning
silent_exit=bool, # Early termination flag
)
Communication Flow
1. Component Communication
2. Data Flow
Task Dispatch (Pull Mode):
Backend API → TaskScheduler → ExecutorDispatcher → DockerExecutor → Docker Daemon → Container
Task Dispatch (Push Mode):
Backend API → TaskQueueService (Redis) → TaskQueueConsumer → ExecutorDispatcher → Container
Status Updates:
Container → Executor → CallbackClient → Executor Manager → Backend API
Heartbeat Flow:
Container → HeartbeatService → Redis → TaskHeartbeatManager → Backend API (on timeout)
3. API Endpoints
Executor Manager → Backend:
GET /api/tasks/fetch- Fetch tasksPOST /api/tasks/status- Update task statusPOST /api/shells/validation-status/{id}- Update validation status
Executor → Executor Manager:
POST /api/tasks/execute- Execute taskPOST /api/tasks/cancel- Cancel taskDELETE /api/tasks/session- Delete sessionGET /api/tasks/sessions- List sessions
Executor (Inside Container):
GET /- Health checkPOST /api/tasks/execute- Main execution endpointPOST /api/tasks/cancel- Cancel running task
Container Lifecycle
Lifecycle Stages:
Creation
- Docker command preparation with all environment variables
- Port allocation and volume mounting
- Label assignment for tracking
- Container start with health check
Execution
- FastAPI server startup
- TASK_INFO parsing
- Agent initialization and configuration
- Pre-execution (code download, env setup)
- Async task execution
Monitoring
- Heartbeat service registration
- Progress callbacks
- Cancellation checks
- Resource monitoring
Termination
- Final callback with result
- Session cleanup (for persistent agents)
- Container removal (or keep for reuse)
Docker Orchestration
1. Container Management
ExecutorDispatcher Pattern:
class ExecutorDispatcher:
"""Dynamically loads executor implementations"""
_executors = {
"docker": DockerExecutor(),
# Extensible for k8s, nomad, etc.
}
@classmethod
def get_executor(cls, task_type: str) -> Executor:
return cls._executors.get(task_type, cls._executors["docker"])
DockerExecutor Key Methods:
| Method | Purpose |
|---|---|
submit_executor() | Main entry - creates or reuses container |
_create_new_container() | Full container lifecycle creation |
_execute_in_existing_container() | Send task to running container |
get_container_status() | Check OOM, exit codes |
cancel_task() | Graceful cancellation |
delete_executor() | Force cleanup |
2. Resource Management
Port Allocation Strategy:
def find_available_port(start=10000, end=65535) -> int:
while True:
port = random.randint(start, end)
if is_port_available(port):
return port
Concurrent Task Control:
# Scheduler-level control
available_slots = min(10, max_concurrent - running)
if available_slots <= 0:
skip_fetch()
# Global limits
MAX_CONCURRENT_TASKS = 30 # Online tasks
MAX_OFFLINE_CONCURRENT_TASKS = 10 # Offline tasks
3. Health Checking
Container Health Monitoring:
def _check_container_health(task, executor_name, is_validation_task):
time.sleep(2) # Wait for startup
status = get_container_status(executor_name)
if status == "exited":
logs = get_container_logs(executor_name)
exit_code = get_exit_code(executor_name)
error_msg = analyze_failure(logs, exit_code)
raise RuntimeError(error_msg)
Failure Analysis:
- Exit code 127: Command not found
- Exit code 126: Permission denied
- "no such file or directory": Binary incompatibility (Alpine vs glibc)
- "libc" or "ld-linux" missing: Missing C library
4. Cleanup and Garbage Collection
SandboxManager GC Task:
async def start_gc_task():
"""Periodic cleanup of orphaned containers"""
# Scans for containers not in active_tasks
# Removes containers idle > timeout
# Updates task status for cleaned containers
Cleanup Triggers:
- Task completion callback received
- Heartbeat timeout detected
- Manual task cancellation
- Container health check failure
Complex Scheduling Logic
1. Multi-Mode Dispatch
Pull Mode Complexity:
- APScheduler with multiple job types
- Cron triggers for offline tasks (21:00-08:00)
- Slot-based concurrency control
- Task type filtering (online/offline/pipeline)
Push Mode Complexity:
- Redis List for FIFO queuing
- Service pool isolation (default/canary)
- Retry mechanism with backoff
- Dead letter queue for failed tasks
2. Session Persistence Logic
ClaudeCode Session Management:
# Session reuse strategy
if session_id in _clients:
client = _clients[session_id]
if client.process.poll() is None:
reuse_client()
else:
remove_from_cache()
create_new_client()
Pipeline Mode Complexity:
- Each bot gets unique session (task_id:bot_id)
new_session=Truecreates subtask-based session- Previous sessions kept for potential jump-back
- Session ID mapping maintained in
_session_id_map
3. Heartbeat-Based Crash Detection
Architecture:
Implementation:
- Redis Sorted Set:
heartbeats:{task_id}with timestamp score - Timeout threshold: 60 seconds default
- OOM detection via container status check
- Automatic task failure notification
4. Task Cancellation
Cancellation Flow:
Complexity:
- Different cancellation per agent type
- Dify: API call to stop task
- ClaudeCode: Client disconnection
- Agno: Run cancellation via cancel_run()
- State machine to prevent race conditions
5. Custom Base Image Validation
Validation Flow:
Stages:
pulling_image- Docker pullstarting_container- Container creationrunning_checks- Dependency validationcompleted- Success or failure
Summary
The Wegent Execution Layer provides a robust, scalable task execution system with:
- Flexible Deployment: Pull vs push modes, service pools, canary support
- Session Management: Persistent AI conversations across multiple turns
- Reliability: Heartbeat monitoring, crash detection, automatic retries
- Observability: OpenTelemetry tracing, detailed logging, progress tracking
- Security: Container isolation, token encryption, GitHub App integration
- Extensibility: Pluggable executor types, agent factory pattern, custom images
Key areas requiring careful attention:
- Session persistence logic in ClaudeCodeAgent (complex caching strategy)
- Heartbeat coordination between Executor and Executor Manager
- Pipeline mode with multiple bots and session management
- Custom base image binary compatibility (glibc vs musl)