Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Wegent Backend API Architecture Analysis

Wegent Backend API Architecture Analysis

Overview

The Wegent backend follows a Kubernetes-style CRD (Custom Resource Definition) architecture implemented with FastAPI. It provides a RESTful API with WebSocket support for real-time communication, authentication/authorization, and a sophisticated adapter pattern for resource management.


1. FastAPI Application Structure

Entry Point (app/main.py)

The application uses a lifespan context manager for startup/shutdown lifecycle management:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # STARTUP: Initialize database, YAML configs, background jobs, schedulers, WebSocket, event bus
    yield
    # SHUTDOWN: Graceful cleanup with timeout, cancel active streams

Startup Sequence:

  1. Redis lock acquisition for distributed initialization
  2. Database migrations (Alembic)
  3. YAML data initialization
  4. Background jobs startup
  5. Scheduler backend initialization (Celery/APScheduler/XXL-JOB)
  6. Socket.IO WebSocket initialization
  7. Event bus and handler registration
  8. PendingRequestRegistry initialization
  9. Device heartbeat monitor startup
  10. IM Channel Manager initialization

Combined ASGI App: Socket.IO + FastAPI are mounted together:

app = create_socketio_asgi_app()  # Combines Socket.IO server with FastAPI

2. API Route Hierarchy

Router Structure

/api (API_PREFIX)
├── /health                    # Health check (no prefix)
├── /auth                      # Authentication (JWT, OAuth2, OIDC)
│   ├── /oauth2               # Swagger-style form login
│   ├── /login                # JSON login
│   └── /oidc                 # OIDC authentication
├── /users                     # User management
│   └── /me/pet               # User pet features
├── /admin                     # Admin operations
├── /groups                    # Group management
├── /projects                  # Project management
├── /api-keys                  # API key management
├── /devices                   # Local device management
├── /bots                      # Bot CRUD operations
├── /models                    # Model management
├── /shells                    # Shell environment management
├── /agents                    # Public shell operations
├── /teams                     # Team management (Agent in UI)
├── /subscriptions             # Subscription management
├── /tasks                     # Task management
├── /subtasks                  # Subtask operations
├── /task-members              # Task member management
├── /task-knowledge-bases      # Task knowledge base links
├── /chat                      # Chat operations
├── /attachments               # File attachments
├── /git                       # Git repository operations
├── /executors                 # Executor management
├── /quota                     # Quota management
├── /dify                      # Dify proxy endpoints
├── /retrievers                # Retriever management
├── /wiki                      # Wiki documentation
├── /wizard                    # AI wizard features
├── /v1/responses              # OpenAI-compatible responses API
├── /knowledge-bases           # Knowledge base management
├── /knowledge-documents       # Document management
├── /tables                    # Data table operations
├── /rag                       # RAG operations
├── /utils                     # Utility endpoints
├── /web-scraper               # Web scraping
├── /v1                        # Kubernetes-style CRD API
│   ├── /namespaces/{namespace}/{kinds}           # List resources
│   ├── /namespaces/{namespace}/{kinds}/{name}    # Get/Update/Delete
│   ├── /kinds/skills                             # Skill management
│   └── /batch                                    # Batch operations
└── /internal                  # Internal service APIs
    ├── /chat                 # Internal chat storage
    ├── /rag                  # Internal RAG operations
    ├── /skills               # Skill binaries
    ├── /tables               # Internal table operations
    ├── /bots                 # Internal bot operations
    ├── /services             # Service extensions
    └── /subscriptions        # Subscription operations

Route Registration Flow


3. Dependency Injection System

Core Dependencies (app/api/dependencies.py)

# Database session dependency
def get_db() -> Generator[Session, None, None]:
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# OpenTelemetry context dependencies
def with_task_telemetry(task_id: int = Path(...)) -> int:
    """Sets task context for distributed tracing"""
    
def with_task_subtask_telemetry(
    task_id: int = Path(...), 
    subtask_id: Optional[int] = None
) -> tuple:
    """Sets both task and subtask context"""

Usage Pattern

@router.get("/{task_id}")
def get_task(
    task_id: int = Depends(with_task_telemetry),
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    # Dependencies are injected automatically
    pass

4. WebSocket Architecture

Socket.IO Namespaces

NamespacePurposeEvents
/chatReal-time chat communicationchat:send, chat:cancel, chat:retry, task:join, task:leave
/local-executorLocal device connectionstask:cancel, task:close-session

Event Flow Architecture

Event Definitions (app/api/ws/events.py)

Client Events (Frontend → Backend):

  • chat:send - Send message
  • chat:cancel - Cancel streaming
  • chat:retry - Retry failed message
  • chat:resume - Resume streaming
  • task:join - Join task room
  • task:leave - Leave task room
  • history:sync - Sync message history
  • skill:response - Skill response from frontend

Server Events (Backend → Frontend):

  • chat:start - AI response started
  • chat:chunk - Content chunk (streaming)
  • chat:done - Response complete
  • chat:error - Error occurred
  • chat:cancelled - Cancelled notification
  • chat:message - User message broadcast
  • task:created - New task notification
  • task:deleted - Task deleted
  • task:status - Status update
  • auth:error - Authentication error

WebSocket Decorators

# Automatic context management
@auto_task_context(TaskJoinPayload)
async def on_task_join(self, sid: str, data: dict) -> dict:
    # Payload is automatically validated and typed
    pass

# Telemetry tracing
@trace_websocket_event(exclude_events={"connect"}, extract_event_data=True)
async def trigger_event(self, event: str, sid: str, *args):
    # Automatic OpenTelemetry span creation
    pass

5. Authentication & Authorization

Authentication Flow

Authentication Methods

  1. JWT Token (Bearer) - Standard web authentication

    • Token expiration: 7 days (configurable)
    • Algorithm: HS256
    • Payload: {sub: username, user_id: id, exp: timestamp}
  2. OAuth2 Password Flow - Swagger UI compatibility

    • Form-based login at /api/auth/oauth2
  3. OIDC - Enterprise SSO

    • Discovery URL-based configuration
    • Callback handling at /api/auth/oidc/callback
  4. API Keys - Service-to-service & programmatic access

    • Personal keys: Direct user authentication
    • Service keys: Impersonate users via wegent-username header
    • Auto-user creation for service keys

Security Functions (app/core/security.py)

# Core authentication
def authenticate_user(db, username, password) -> Union[User, None]
def create_access_token(data: Dict[str, Any], expires_delta: Optional[int]) -> str
def verify_token(token: str) -> Dict[str, Any]

# Dependency injection
def get_current_user(token: str = Depends(oauth2_scheme)) -> User
def get_admin_user(current_user: User = Depends(get_current_user)) -> User
def get_auth_context(...) -> AuthContext  # Flexible API key auth
def get_current_user_flexible(...) -> User

Authorization Patterns

# Role-based access control
@router.delete("/{team_id}")
def delete_team(
    team_id: int,
    current_user: User = Depends(get_current_user),
    db: Session = Depends(get_db)
):
    # Check ownership within service layer
    team_kinds_service.delete_with_user(db, team_id, current_user.id)

# Admin-only endpoints
@router.get("/admin/users")
def list_all_users(
    current_user: User = Depends(get_admin_user)  # Validates admin role
):
    pass

6. CRUD Patterns for Kind Resources

Kubernetes-Style API Design

The Kind API follows Kubernetes conventions:

  • apiVersion: agent.wecode.io/v1
  • kind: Resource type (Team, Bot, Task, etc.)
  • metadata: name, namespace, labels, annotations
  • spec: Resource specification
  • status: Current state

Resource URL Pattern

GET    /api/v1/namespaces/{namespace}/{kinds}           # List resources
POST   /api/v1/namespaces/{namespace}/{kinds}           # Create resource
GET    /api/v1/namespaces/{namespace}/{kinds}/{name}    # Get resource
PUT    /api/v1/namespaces/{namespace}/{kinds}/{name}    # Update resource
DELETE /api/v1/namespaces/{namespace}/{kinds}/{name}    # Delete resource

Kind Resource Types

KindDatabase TableSchema ClassDescription
GhostkindsGhostSystem prompts & MCP servers
ModelkindsModelLLM configuration
ShellkindsShellExecution environment
BotkindsBotAgent building block
TeamkindsTeamUser-facing AI agent
WorkspacekindsWorkspaceGit repository
TasktasksTaskExecution unit
RetrieverkindsRetrieverRAG retriever config
DevicekindsDeviceLocal device
Skillskill_binariesSkillSkill packages

CRUD Implementation (app/api/endpoints/kind/)

# Common helpers (app/api/endpoints/kind/common.py)
KIND_SCHEMA_MAP = {
    "Ghost": Ghost,
    "Model": Model,
    "Shell": Shell,
    "Bot": Bot,
    "Team": Team,
    # ... etc
}

# Validation
kind = validate_resource_type(kinds)  # "teams" -> "Team"

# Formatting
return format_single_resource(kind, resource)
return format_resource_list(kind, resources)

# Validation & preparation
validated_resource = validate_and_prepare_resource(kind, resource, namespace, name)

Request Processing Pipeline


7. Adapter Pattern Usage

Purpose

The adapter layer (app/api/endpoints/adapter/) provides backward compatibility between:

  • Legacy API interfaces - Original endpoints that frontend uses
  • New CRD-based storage - Kind resources in database

Adapter Pattern Architecture

Adapter Implementation Example

# app/api/endpoints/adapter/teams.py
@router.get("", response_model=TeamListResponse)
def list_teams(
    page: int = Query(1, ge=1),
    limit: int = Query(10, ge=1, le=100),
    scope: str = Query("all"),
    db: Session = Depends(get_db),
    current_user: User = Depends(security.get_current_user),
):
    # Uses adapter service that works with Kind model
    items = team_kinds_service.get_user_teams(
        db=db, user_id=current_user.id, skip=skip, limit=limit, scope=scope
    )
    return {"total": total, "items": items}

@router.post("", response_model=TeamInDB, status_code=status.HTTP_201_CREATED)
def create_team(
    team_create: TeamCreate,
    current_user: User = Depends(security.get_current_user),
    db: Session = Depends(get_db),
):
    # Adapter service handles Kind model creation
    return team_kinds_service.create_with_user(
        db=db, obj_in=team_create, user_id=current_user.id
    )

Adapter Service Pattern

# app/services/adapters/team_kinds.py
class TeamKindsService:
    def get_user_teams(self, db, user_id, skip, limit, scope):
        # Query Kind table with kind="Team"
        # Transform to Team schema
        pass
    
    def create_with_user(self, db, obj_in, user_id, group_name=None):
        # Convert TeamCreate to Kind model
        # Set ownership, namespace
        # Save to kinds table
        pass

Benefits of Adapter Pattern

  1. Backward Compatibility - Frontend continues using familiar APIs
  2. Gradual Migration - Can migrate features incrementally
  3. Dual Interface - Both legacy and CRD APIs work simultaneously
  4. Consistency - Same underlying data model for all access patterns

8. Configuration System

Settings (app/core/config.py)

Uses Pydantic Settings with environment variable support:

class Settings(BaseSettings):
    # Project
    PROJECT_NAME: str = "Task Manager Backend"
    VERSION: str = "1.0.0"
    API_PREFIX: str = "/api"
    
    # Database
    DATABASE_URL: str = "mysql+asyncmy://..."
    DB_AUTO_MIGRATE: bool = True
    
    # Authentication
    SECRET_KEY: str = "secret-key"
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 7 * 24 * 60
    
    # External Services
    CHAT_SHELL_MODE: str = "http"  # or "package"
    CHAT_SHELL_URL: str = "http://localhost:8100"
    
    # Scheduler
    SCHEDULER_BACKEND: str = "celery"  # or "apscheduler", "xxljob"
    
    # Feature Flags
    CHAT_MCP_ENABLED: bool = False
    MEMORY_ENABLED: bool = False
    WEB_SEARCH_ENABLED: bool = False

Environment Loading

Custom NoInterpolationDotEnvSettingsSource preserves template variables like ${{user.name}} in .env files.


9. Exception Handling

Custom Exception Hierarchy

# app/core/exceptions.py
class NotFoundException(HTTPException):
    def __init__(self, detail: str):
        super().__init__(status_code=404, detail=detail)

class ConflictException(HTTPException):
    def __init__(self, detail: str):
        super().__init__(status_code=409, detail=detail)

class ValidationException(HTTPException):
    def __init__(self, detail: str):
        super().__init__(status_code=400, detail=detail)

class CustomHTTPException(HTTPException):
    def __init__(self, status_code: int, detail: str, error_code: int = None):
        super().__init__(status_code=status_code, detail=detail)
        self.error_code = error_code

Global Exception Handlers

# Registered in create_app()
app.add_exception_handler(CustomHTTPException, http_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(Exception, python_exception_handler)

10. Key Design Patterns Summary

PatternLocationPurpose
Dependency Injectionapp/api/dependencies.pyDatabase sessions, telemetry context
Repository Patternapp/services/readers/Data access abstraction
Service Layerapp/services/Business logic encapsulation
Adapter Patternapp/api/endpoints/adapter/Legacy API compatibility
CRD/K8s Styleapp/api/endpoints/kind/Resource management
WebSocket Eventsapp/api/ws/Real-time communication
Decorator Patternapp/api/ws/decorators.pyTelemetry, context management
Event Busapp/core/events.pyDecoupled module communication

11. Complex Areas for Deeper Analysis

1. Streaming Architecture

The streaming system has two modes:

  • Legacy: Direct WebSocket emission from streaming handler
  • Bridge: Redis Pub/Sub + WebSocketBridge for multi-worker scenarios

File locations:

  • app/services/chat/streaming_core.py
  • app/services/chat/ws_emitter.py
  • app/services/chat/storage.py

2. Task Execution Flow

Tasks can be executed through multiple paths:

  • Chat Shell: Direct LLM API (no Docker)
  • Executor: Docker-based execution (ClaudeCode, Agno)
  • Device: Local device execution

File locations:

  • app/services/chat/trigger.py
  • app/services/chat/operations.py
  • app/services/executor_manager.py

3. Skill System

Dynamic skill loading with frontend interactions:

  • Skill binary management
  • Tool provider integration
  • Pending request registry for async skill operations

File locations:

  • app/api/endpoints/internal/skills.py
  • chat_shell/tools/pending_request.py
  • app/services/skills/

4. RAG Pipeline

Multi-stage retrieval-augmented generation:

  • Knowledge base indexing
  • Chunk storage (DB vs vector-only)
  • Source attribution in responses

File locations:

  • app/services/rag/
  • app/api/endpoints/rag.py
  • app/services/chat/rag.py

5. Scheduler System

Pluggable scheduler backends:

  • Celery (default, with embedded mode)
  • APScheduler (lightweight)
  • XXL-JOB (enterprise distributed)

File locations:

  • app/core/scheduler.py
  • app/services/scheduler/

Summary

The Wegent backend API demonstrates sophisticated FastAPI patterns:

  1. Modular Router Design - Clean separation of concerns with hierarchical routing
  2. Dual API Support - Legacy REST + Kubernetes-style CRD APIs via adapter pattern
  3. Real-time Communication - Socket.IO with namespaced events and automatic context
  4. Flexible Authentication - JWT, OAuth2, OIDC, and API keys with auto-user provisioning
  5. Comprehensive Telemetry - OpenTelemetry integration throughout the request lifecycle
  6. Graceful Lifecycle - Proper startup/shutdown with distributed locking
  7. Service Layer Architecture - Business logic separated from HTTP handling

The adapter pattern is particularly noteworthy for enabling gradual system evolution while maintaining frontend compatibility.