Wegent Backend API Architecture Analysis
Overview
The Wegent backend follows a Kubernetes-style CRD (Custom Resource Definition) architecture implemented with FastAPI. It provides a RESTful API with WebSocket support for real-time communication, authentication/authorization, and a sophisticated adapter pattern for resource management.
1. FastAPI Application Structure
Entry Point (app/main.py)
The application uses a lifespan context manager for startup/shutdown lifecycle management:
@asynccontextmanager
async def lifespan(app: FastAPI):
# STARTUP: Initialize database, YAML configs, background jobs, schedulers, WebSocket, event bus
yield
# SHUTDOWN: Graceful cleanup with timeout, cancel active streams
Startup Sequence:
- Redis lock acquisition for distributed initialization
- Database migrations (Alembic)
- YAML data initialization
- Background jobs startup
- Scheduler backend initialization (Celery/APScheduler/XXL-JOB)
- Socket.IO WebSocket initialization
- Event bus and handler registration
- PendingRequestRegistry initialization
- Device heartbeat monitor startup
- IM Channel Manager initialization
Combined ASGI App: Socket.IO + FastAPI are mounted together:
app = create_socketio_asgi_app() # Combines Socket.IO server with FastAPI
2. API Route Hierarchy
Router Structure
/api (API_PREFIX)
├── /health # Health check (no prefix)
├── /auth # Authentication (JWT, OAuth2, OIDC)
│ ├── /oauth2 # Swagger-style form login
│ ├── /login # JSON login
│ └── /oidc # OIDC authentication
├── /users # User management
│ └── /me/pet # User pet features
├── /admin # Admin operations
├── /groups # Group management
├── /projects # Project management
├── /api-keys # API key management
├── /devices # Local device management
├── /bots # Bot CRUD operations
├── /models # Model management
├── /shells # Shell environment management
├── /agents # Public shell operations
├── /teams # Team management (Agent in UI)
├── /subscriptions # Subscription management
├── /tasks # Task management
├── /subtasks # Subtask operations
├── /task-members # Task member management
├── /task-knowledge-bases # Task knowledge base links
├── /chat # Chat operations
├── /attachments # File attachments
├── /git # Git repository operations
├── /executors # Executor management
├── /quota # Quota management
├── /dify # Dify proxy endpoints
├── /retrievers # Retriever management
├── /wiki # Wiki documentation
├── /wizard # AI wizard features
├── /v1/responses # OpenAI-compatible responses API
├── /knowledge-bases # Knowledge base management
├── /knowledge-documents # Document management
├── /tables # Data table operations
├── /rag # RAG operations
├── /utils # Utility endpoints
├── /web-scraper # Web scraping
├── /v1 # Kubernetes-style CRD API
│ ├── /namespaces/{namespace}/{kinds} # List resources
│ ├── /namespaces/{namespace}/{kinds}/{name} # Get/Update/Delete
│ ├── /kinds/skills # Skill management
│ └── /batch # Batch operations
└── /internal # Internal service APIs
├── /chat # Internal chat storage
├── /rag # Internal RAG operations
├── /skills # Skill binaries
├── /tables # Internal table operations
├── /bots # Internal bot operations
├── /services # Service extensions
└── /subscriptions # Subscription operations
Route Registration Flow
3. Dependency Injection System
Core Dependencies (app/api/dependencies.py)
# Database session dependency
def get_db() -> Generator[Session, None, None]:
db = SessionLocal()
try:
yield db
finally:
db.close()
# OpenTelemetry context dependencies
def with_task_telemetry(task_id: int = Path(...)) -> int:
"""Sets task context for distributed tracing"""
def with_task_subtask_telemetry(
task_id: int = Path(...),
subtask_id: Optional[int] = None
) -> tuple:
"""Sets both task and subtask context"""
Usage Pattern
@router.get("/{task_id}")
def get_task(
task_id: int = Depends(with_task_telemetry),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
# Dependencies are injected automatically
pass
4. WebSocket Architecture
Socket.IO Namespaces
| Namespace | Purpose | Events |
|---|---|---|
/chat | Real-time chat communication | chat:send, chat:cancel, chat:retry, task:join, task:leave |
/local-executor | Local device connections | task:cancel, task:close-session |
Event Flow Architecture
Event Definitions (app/api/ws/events.py)
Client Events (Frontend → Backend):
chat:send- Send messagechat:cancel- Cancel streamingchat:retry- Retry failed messagechat:resume- Resume streamingtask:join- Join task roomtask:leave- Leave task roomhistory:sync- Sync message historyskill:response- Skill response from frontend
Server Events (Backend → Frontend):
chat:start- AI response startedchat:chunk- Content chunk (streaming)chat:done- Response completechat:error- Error occurredchat:cancelled- Cancelled notificationchat:message- User message broadcasttask:created- New task notificationtask:deleted- Task deletedtask:status- Status updateauth:error- Authentication error
WebSocket Decorators
# Automatic context management
@auto_task_context(TaskJoinPayload)
async def on_task_join(self, sid: str, data: dict) -> dict:
# Payload is automatically validated and typed
pass
# Telemetry tracing
@trace_websocket_event(exclude_events={"connect"}, extract_event_data=True)
async def trigger_event(self, event: str, sid: str, *args):
# Automatic OpenTelemetry span creation
pass
5. Authentication & Authorization
Authentication Flow
Authentication Methods
JWT Token (Bearer) - Standard web authentication
- Token expiration: 7 days (configurable)
- Algorithm: HS256
- Payload:
{sub: username, user_id: id, exp: timestamp}
OAuth2 Password Flow - Swagger UI compatibility
- Form-based login at
/api/auth/oauth2
- Form-based login at
OIDC - Enterprise SSO
- Discovery URL-based configuration
- Callback handling at
/api/auth/oidc/callback
API Keys - Service-to-service & programmatic access
- Personal keys: Direct user authentication
- Service keys: Impersonate users via
wegent-usernameheader - Auto-user creation for service keys
Security Functions (app/core/security.py)
# Core authentication
def authenticate_user(db, username, password) -> Union[User, None]
def create_access_token(data: Dict[str, Any], expires_delta: Optional[int]) -> str
def verify_token(token: str) -> Dict[str, Any]
# Dependency injection
def get_current_user(token: str = Depends(oauth2_scheme)) -> User
def get_admin_user(current_user: User = Depends(get_current_user)) -> User
def get_auth_context(...) -> AuthContext # Flexible API key auth
def get_current_user_flexible(...) -> User
Authorization Patterns
# Role-based access control
@router.delete("/{team_id}")
def delete_team(
team_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# Check ownership within service layer
team_kinds_service.delete_with_user(db, team_id, current_user.id)
# Admin-only endpoints
@router.get("/admin/users")
def list_all_users(
current_user: User = Depends(get_admin_user) # Validates admin role
):
pass
6. CRUD Patterns for Kind Resources
Kubernetes-Style API Design
The Kind API follows Kubernetes conventions:
- apiVersion:
agent.wecode.io/v1 - kind: Resource type (Team, Bot, Task, etc.)
- metadata: name, namespace, labels, annotations
- spec: Resource specification
- status: Current state
Resource URL Pattern
GET /api/v1/namespaces/{namespace}/{kinds} # List resources
POST /api/v1/namespaces/{namespace}/{kinds} # Create resource
GET /api/v1/namespaces/{namespace}/{kinds}/{name} # Get resource
PUT /api/v1/namespaces/{namespace}/{kinds}/{name} # Update resource
DELETE /api/v1/namespaces/{namespace}/{kinds}/{name} # Delete resource
Kind Resource Types
| Kind | Database Table | Schema Class | Description |
|---|---|---|---|
| Ghost | kinds | Ghost | System prompts & MCP servers |
| Model | kinds | Model | LLM configuration |
| Shell | kinds | Shell | Execution environment |
| Bot | kinds | Bot | Agent building block |
| Team | kinds | Team | User-facing AI agent |
| Workspace | kinds | Workspace | Git repository |
| Task | tasks | Task | Execution unit |
| Retriever | kinds | Retriever | RAG retriever config |
| Device | kinds | Device | Local device |
| Skill | skill_binaries | Skill | Skill packages |
CRUD Implementation (app/api/endpoints/kind/)
# Common helpers (app/api/endpoints/kind/common.py)
KIND_SCHEMA_MAP = {
"Ghost": Ghost,
"Model": Model,
"Shell": Shell,
"Bot": Bot,
"Team": Team,
# ... etc
}
# Validation
kind = validate_resource_type(kinds) # "teams" -> "Team"
# Formatting
return format_single_resource(kind, resource)
return format_resource_list(kind, resources)
# Validation & preparation
validated_resource = validate_and_prepare_resource(kind, resource, namespace, name)
Request Processing Pipeline
7. Adapter Pattern Usage
Purpose
The adapter layer (app/api/endpoints/adapter/) provides backward compatibility between:
- Legacy API interfaces - Original endpoints that frontend uses
- New CRD-based storage - Kind resources in database
Adapter Pattern Architecture
Adapter Implementation Example
# app/api/endpoints/adapter/teams.py
@router.get("", response_model=TeamListResponse)
def list_teams(
page: int = Query(1, ge=1),
limit: int = Query(10, ge=1, le=100),
scope: str = Query("all"),
db: Session = Depends(get_db),
current_user: User = Depends(security.get_current_user),
):
# Uses adapter service that works with Kind model
items = team_kinds_service.get_user_teams(
db=db, user_id=current_user.id, skip=skip, limit=limit, scope=scope
)
return {"total": total, "items": items}
@router.post("", response_model=TeamInDB, status_code=status.HTTP_201_CREATED)
def create_team(
team_create: TeamCreate,
current_user: User = Depends(security.get_current_user),
db: Session = Depends(get_db),
):
# Adapter service handles Kind model creation
return team_kinds_service.create_with_user(
db=db, obj_in=team_create, user_id=current_user.id
)
Adapter Service Pattern
# app/services/adapters/team_kinds.py
class TeamKindsService:
def get_user_teams(self, db, user_id, skip, limit, scope):
# Query Kind table with kind="Team"
# Transform to Team schema
pass
def create_with_user(self, db, obj_in, user_id, group_name=None):
# Convert TeamCreate to Kind model
# Set ownership, namespace
# Save to kinds table
pass
Benefits of Adapter Pattern
- Backward Compatibility - Frontend continues using familiar APIs
- Gradual Migration - Can migrate features incrementally
- Dual Interface - Both legacy and CRD APIs work simultaneously
- Consistency - Same underlying data model for all access patterns
8. Configuration System
Settings (app/core/config.py)
Uses Pydantic Settings with environment variable support:
class Settings(BaseSettings):
# Project
PROJECT_NAME: str = "Task Manager Backend"
VERSION: str = "1.0.0"
API_PREFIX: str = "/api"
# Database
DATABASE_URL: str = "mysql+asyncmy://..."
DB_AUTO_MIGRATE: bool = True
# Authentication
SECRET_KEY: str = "secret-key"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 7 * 24 * 60
# External Services
CHAT_SHELL_MODE: str = "http" # or "package"
CHAT_SHELL_URL: str = "http://localhost:8100"
# Scheduler
SCHEDULER_BACKEND: str = "celery" # or "apscheduler", "xxljob"
# Feature Flags
CHAT_MCP_ENABLED: bool = False
MEMORY_ENABLED: bool = False
WEB_SEARCH_ENABLED: bool = False
Environment Loading
Custom NoInterpolationDotEnvSettingsSource preserves template variables like ${{user.name}} in .env files.
9. Exception Handling
Custom Exception Hierarchy
# app/core/exceptions.py
class NotFoundException(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=404, detail=detail)
class ConflictException(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=409, detail=detail)
class ValidationException(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=400, detail=detail)
class CustomHTTPException(HTTPException):
def __init__(self, status_code: int, detail: str, error_code: int = None):
super().__init__(status_code=status_code, detail=detail)
self.error_code = error_code
Global Exception Handlers
# Registered in create_app()
app.add_exception_handler(CustomHTTPException, http_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(Exception, python_exception_handler)
10. Key Design Patterns Summary
| Pattern | Location | Purpose |
|---|---|---|
| Dependency Injection | app/api/dependencies.py | Database sessions, telemetry context |
| Repository Pattern | app/services/readers/ | Data access abstraction |
| Service Layer | app/services/ | Business logic encapsulation |
| Adapter Pattern | app/api/endpoints/adapter/ | Legacy API compatibility |
| CRD/K8s Style | app/api/endpoints/kind/ | Resource management |
| WebSocket Events | app/api/ws/ | Real-time communication |
| Decorator Pattern | app/api/ws/decorators.py | Telemetry, context management |
| Event Bus | app/core/events.py | Decoupled module communication |
11. Complex Areas for Deeper Analysis
1. Streaming Architecture
The streaming system has two modes:
- Legacy: Direct WebSocket emission from streaming handler
- Bridge: Redis Pub/Sub + WebSocketBridge for multi-worker scenarios
File locations:
app/services/chat/streaming_core.pyapp/services/chat/ws_emitter.pyapp/services/chat/storage.py
2. Task Execution Flow
Tasks can be executed through multiple paths:
- Chat Shell: Direct LLM API (no Docker)
- Executor: Docker-based execution (ClaudeCode, Agno)
- Device: Local device execution
File locations:
app/services/chat/trigger.pyapp/services/chat/operations.pyapp/services/executor_manager.py
3. Skill System
Dynamic skill loading with frontend interactions:
- Skill binary management
- Tool provider integration
- Pending request registry for async skill operations
File locations:
app/api/endpoints/internal/skills.pychat_shell/tools/pending_request.pyapp/services/skills/
4. RAG Pipeline
Multi-stage retrieval-augmented generation:
- Knowledge base indexing
- Chunk storage (DB vs vector-only)
- Source attribution in responses
File locations:
app/services/rag/app/api/endpoints/rag.pyapp/services/chat/rag.py
5. Scheduler System
Pluggable scheduler backends:
- Celery (default, with embedded mode)
- APScheduler (lightweight)
- XXL-JOB (enterprise distributed)
File locations:
app/core/scheduler.pyapp/services/scheduler/
Summary
The Wegent backend API demonstrates sophisticated FastAPI patterns:
- Modular Router Design - Clean separation of concerns with hierarchical routing
- Dual API Support - Legacy REST + Kubernetes-style CRD APIs via adapter pattern
- Real-time Communication - Socket.IO with namespaced events and automatic context
- Flexible Authentication - JWT, OAuth2, OIDC, and API keys with auto-user provisioning
- Comprehensive Telemetry - OpenTelemetry integration throughout the request lifecycle
- Graceful Lifecycle - Proper startup/shutdown with distributed locking
- Service Layer Architecture - Business logic separated from HTTP handling
The adapter pattern is particularly noteworthy for enabling gradual system evolution while maintaining frontend compatibility.