Code Reader
首页
帮助
设计文档
首页
帮助
设计文档
  • Deep Dive: Wegent Skill System & MCP Integration

Deep Dive: Wegent Skill System & MCP Integration

Executive Summary

The Wegent Skill System is a sophisticated on-demand capability loading mechanism that enables AI agents to dynamically acquire specialized knowledge and tools at runtime. Combined with the Model Context Protocol (MCP) integration, it creates a powerful, token-efficient architecture for extending agent capabilities without bloating the context window.

Key Innovation: The system implements "lazy loading" for skills - detailed instructions are only injected into the system prompt when the LLM explicitly requests them via the load_skill() tool call, reducing token consumption by 40-80% compared to static prompt injection.


1. Skill System Architecture

1.1 CRD Definition and Storage

Skill CRD Schema (backend/app/schemas/kind.py)

class SkillSpec(BaseModel):
    """Skill specification"""

    description: str  # Trigger condition description
    displayName: Optional[str] = None  # Friendly display name
    prompt: Optional[str] = None  # Full prompt content from SKILL.md
    version: Optional[str] = None
    author: Optional[str] = None
    tags: Optional[List[str]] = None
    bindShells: Optional[List[str]] = Field(
        None,
        description="List of shell types this skill is compatible with. "
        "Valid values: 'ClaudeCode', 'Agno', 'Dify', 'Chat'.",
    )
    config: Optional[Dict[str, Any]] = Field(
        None,
        description="Skill-level configuration shared by all tools.",
    )
    tools: Optional[List[SkillToolDeclaration]] = Field(
        None,
        description="Tool declarations for skill-tool binding.",
    )
    provider: Optional[SkillProviderConfig] = Field(
        None,
        description="Provider configuration for dynamic loading.",
    )
    mcpServers: Optional[Dict[str, Any]] = Field(
        None,
        description="MCP servers configuration for this skill.",
    )

Database Storage Model (backend/app/models/skill_binary.py)

Skills are stored across two tables for efficient querying and binary storage:

# kinds table - stores CRD metadata (name, namespace, spec as JSON)
# skill_binaries table - stores ZIP package content

class SkillBinary(Base):
    """Skill binary storage for ZIP packages"""

    __tablename__ = "skill_binaries"

    id = Column(Integer, primary_key=True, autoincrement=True)
    kind_id = Column(Integer, ForeignKey("kinds.id", ondelete="CASCADE"), nullable=False)
    binary_data = Column(LargeBinary, nullable=False)  # ZIP content
    file_size = Column(Integer, nullable=False)
    file_hash = Column(String(64), nullable=False)  # SHA256 hash
    created_at = Column(DateTime, default=datetime.utcnow)

Design Rationale: The separation allows fast metadata queries on the kinds table while keeping binary data (which can be large) in a separate table accessed only when needed.


1.2 Skill Loading Flow


1.3 LoadSkillTool Implementation (chat_shell/chat_shell/tools/builtin/load_skill.py)

The LoadSkillTool is the core mechanism enabling on-demand skill loading:

class LoadSkillTool(BaseTool):
    """Tool to load a skill and get its full prompt content.
    
    Session-level caching with persistence:
    - Skills are cached within a single conversation turn
    - First call returns full prompt, subsequent calls return short confirmation
    - Skills remain loaded for up to 5 conversation turns (configurable)
    - Skill state is restored from chat history at the start of each turn
    """

    name: str = "load_skill"
    description: str = (
        "Load a skill's full instructions when you need specialized guidance. "
        "Call this tool when your task matches one of the available skills' descriptions. "
        "Note: Within the same response, if you've already loaded a skill, calling it again "
        "will confirm it's still active without repeating the full instructions."
    )

    # Session-level tracking
    _expanded_skills: Set[str] = PrivateAttr(default_factory=set)
    _loaded_skill_prompts: dict[str, str] = PrivateAttr(default_factory=dict)
    _skill_remaining_turns: dict[str, int] = PrivateAttr(default_factory=dict)

    def _run(self, skill_name: str) -> str:
        """Load skill and return prompt content."""
        if skill_name not in self.skill_names:
            return f"Error: Skill '{skill_name}' is not available."

        # Check if skill was already expanded in this turn
        if skill_name in self._expanded_skills:
            # Reset the remaining turns counter
            self._skill_remaining_turns[skill_name] = self.skill_retention_turns
            return f"Skill '{skill_name}' is already active..."

        # Get skill metadata
        skill_info = self.skill_metadata.get(skill_name, {})
        prompt = skill_info.get("prompt", "")

        if not prompt:
            return f"Error: Skill '{skill_name}' has no prompt content."

        # Mark skill as expanded for this turn
        self._expanded_skills.add(skill_name)
        self._loaded_skill_prompts[skill_name] = prompt
        self._skill_remaining_turns[skill_name] = self.skill_retention_turns

        return f"Skill '{skill_name}' has been loaded. The instructions have been added to the system prompt."

    def get_prompt_modification(self) -> str:
        """Get prompt modification content for system prompt injection.
        
        This method implements the PromptModifierTool protocol, allowing
        LangGraphAgentBuilder to automatically inject loaded skill prompts.
        """
        if not self._loaded_skill_prompts:
            return ""

        parts = []
        for skill_name, prompt in self._loaded_skill_prompts.items():
            parts.append(f"\n\n## Skill: {skill_name}\n\n{prompt}")

        return (
            "\n\n<skill>\n# Loaded Skill Instructions\n\nThe following skills have been loaded. "
            + "".join(parts)
            + "\n</skill>"
        )

Key Design Patterns:

  1. Session-level caching: Prevents redundant prompt injection within the same conversation turn
  2. Turn-based retention: Skills expire after 5 turns (configurable) to prevent context bloat
  3. State restoration: Skill loading state is restored from chat history on page refresh
  4. PromptModifierTool protocol: Enables automatic system prompt injection

1.4 Dynamic Provider Loading System

SkillToolProvider Interface (chat_shell/chat_shell/skills/provider.py)

class SkillToolProvider(ABC):
    """Abstract base class for skill tool providers.
    
    A tool provider is responsible for creating tool instances
    for a specific skill. Each provider is registered with a
    unique provider name and can create one or more tools.
    """

    @property
    @abstractmethod
    def provider_name(self) -> str:
        """Unique identifier for this provider."""
        pass

    @property
    @abstractmethod
    def supported_tools(self) -> list[str]:
        """List of tool names this provider can create."""
        pass

    @abstractmethod
    def create_tool(
        self,
        tool_name: str,
        context: SkillToolContext,
        tool_config: Optional[dict[str, Any]] = None,
    ) -> BaseTool:
        """Create a tool instance."""
        pass

SkillToolRegistry (chat_shell/chat_shell/skills/registry.py)

The registry implements the Service Locator pattern with thread-safe singleton access:

class SkillToolRegistry:
    """Central registry for skill tool providers.
    
    Implements Service Locator pattern with thread-safe access.
    """

    _instance: Optional["SkillToolRegistry"] = None
    _instance_lock: threading.Lock = threading.Lock()
    _providers: dict[str, SkillToolProvider]
    _providers_lock: threading.Lock

    @classmethod
    def get_instance(cls) -> "SkillToolRegistry":
        """Get singleton instance with double-checked locking."""
        if cls._instance is None:
            with cls._instance_lock:
                if cls._instance is None:
                    cls._instance = cls()
        return cls._instance

    def load_provider_from_zip(
        self,
        zip_content: bytes,
        provider_config: dict[str, Any],
        skill_name: str,
    ) -> Optional[SkillToolProvider]:
        """Dynamically load a provider from a skill package.
        
        This method extracts all Python modules from the ZIP package
        and dynamically loads them as a package.
        """
        # Create unique package name for this skill
        package_name = f"skill_pkg_{skill_name.replace('-', '_')}"
        
        with zipfile.ZipFile(io.BytesIO(zip_content), "r") as zip_file:
            # Load all Python modules
            for py_mod_name, file_path in python_files.items():
                full_module_name = f"{package_name}.{py_mod_name}"
                module_code = zip_file.read(file_path).decode("utf-8")
                
                # Create module spec and execute
                spec = importlib.util.spec_from_loader(
                    full_module_name,
                    loader=None,
                    origin=f"skill://{skill_name}/{py_mod_name}.py",
                )
                module = importlib.util.module_from_spec(spec)
                exec(module_code, module.__dict__)
                
        # Instantiate provider class
        provider_class = getattr(provider_module, class_name)
        return provider_class()

Security Boundary: Only public skills (user_id=0) can load dynamic code. User-uploaded skills are restricted to prompt content only, preventing arbitrary code execution from untrusted sources.


2. MCP (Model Context Protocol) Integration

2.1 MCP Architecture Overview

MCP is an open protocol standardizing how AI agents connect to external data sources and tools. Wegent supports three transport types:

TransportUse CaseConnection Mode
stdioLocal CLI toolsSubprocess with stdin/stdout
sseRemote HTTP serversServer-Sent Events stream
streamable-httpHTTP-based MCPHTTP streaming

2.2 MCP Client Implementation (chat_shell/chat_shell/tools/mcp/client.py)

class MCPClient:
    """MCP client with async context manager support.
    
    Wraps langchain-mcp-adapters MultiServerMCPClient with:
    - Connection timeout protection (30s default)
    - Tool wrapping with timeout and exception handling
    - Graceful degradation (failed tools return errors, don't crash)
    """

    def __init__(
        self,
        config: dict[str, dict[str, Any]],
        task_data: dict[str, Any] | None = None
    ):
        self.config = config
        self.task_data = task_data
        self.connections = build_connections(config, task_data)
        self._client: MultiServerMCPClient | None = None
        self._tools: list[BaseTool] = []

    async def connect(self) -> None:
        """Connect to all configured MCP servers.
        
        Fault-tolerant: If some servers fail, tools from successful
        connections will still be available.
        """
        self._client = MultiServerMCPClient(connections=self.connections)

        # Load tools from each server individually for graceful failure
        for server_name in self.connections.keys():
            try:
                tools = await self._client.get_tools(server_name=server_name)
                successful_servers.append(server_name)
                raw_tools.extend(tools)
            except Exception as e:
                failed_servers.append(server_name)
                logger.warning(f"Failed to load tools from server '{server_name}': {e}")

        # Wrap all tools with protection mechanisms
        self._tools = [wrap_tool_with_protection(tool) for tool in raw_tools]

2.3 Tool Protection Mechanisms

def wrap_tool_with_protection(
    tool: BaseTool, timeout: float = DEFAULT_TOOL_TIMEOUT
) -> BaseTool:
    """Wrap an MCP tool with timeout and exception protection.
    
    Ensures:
    - Tool execution has a timeout limit
    - Exceptions don't crash the chat service
    - Failed tools return error messages instead of raising exceptions
    """
    original_run = tool._run if hasattr(tool, "_run") else None
    original_arun = tool._arun if hasattr(tool, "_arun") else None

    def protected_run(*args, **kwargs):
        """Synchronous tool execution with protection."""
        try:
            with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
                future = executor.submit(original_run, *args, **kwargs)
                try:
                    return future.result(timeout=timeout)
                except concurrent.futures.TimeoutError:
                    return f"MCP tool '{tool.name}' timed out after {timeout}s"
        except Exception as e:
            logger.exception(f"MCP tool '{tool.name}' failed: {e}")
            return f"MCP tool '{tool.name}' failed: {e!s}"

    async def protected_arun(*args, **kwargs):
        """Asynchronous tool execution with timeout."""
        try:
            result = await asyncio.wait_for(
                original_arun(*args, **kwargs), timeout=timeout
            )
            return result
        except asyncio.TimeoutError:
            return f"MCP tool '{tool.name}' timed out after {timeout}s"
        except Exception as e:
            return f"MCP tool '{tool.name}' failed: {e!s}"

    tool._run = protected_run
    tool._arun = protected_arun
    return tool

2.4 MCP Configuration Hierarchy

Configuration Precedence:

  1. Skill-level MCP servers (highest priority)
  2. Bot/Ghost MCP servers
  3. Backend global MCP servers (lowest priority)

2.5 MCP Tool Call Sequence


3. Frontend Skill Interaction (PendingRequestRegistry)

3.1 Overview

The PendingRequestRegistry enables skills that require frontend rendering/processing to communicate asynchronously with the frontend UI:

3.2 PendingRequestRegistry Implementation (chat_shell/chat_shell/tools/pending_requests.py)

@dataclass
class PendingRequest:
    """Represents a pending skill request waiting for frontend response."""

    request_id: str
    skill_name: str
    action: str
    payload: Dict[str, Any]
    future: asyncio.Future
    created_at: datetime = field(default_factory=datetime.utcnow)
    timeout_seconds: float = 30.0


class PendingRequestRegistry:
    """Registry for managing pending skill requests.
    
    Uses local asyncio.Future for in-process waiting.
    """

    def __init__(self):
        self._local_requests: Dict[str, PendingRequest] = {}
        self._lock = asyncio.Lock()

    async def register(
        self,
        request_id: str,
        skill_name: str,
        action: str,
        payload: Dict[str, Any],
        timeout_seconds: float = 30.0,
    ) -> asyncio.Future:
        """Register a new pending request and return a future to await."""
        loop = asyncio.get_event_loop()
        future = loop.create_future()

        request = PendingRequest(
            request_id=request_id,
            skill_name=skill_name,
            action=action,
            payload=payload,
            future=future,
            timeout_seconds=timeout_seconds,
        )

        async with self._lock:
            self._local_requests[request_id] = request

        return future

    async def resolve(
        self,
        request_id: str,
        result: Any,
        error: Optional[str] = None,
    ) -> bool:
        """Resolve a pending request with result or error."""
        async with self._lock:
            request = self._local_requests.pop(request_id, None)

        if not request or request.future.done():
            return False

        response = {
            "success": error is None,
            "result": result,
            "error": error,
        }

        request.future.set_result(response)
        return True

3.3 Backend WebSocket Handler (backend/app/api/ws/chat_namespace.py)

async def on_skill_response(self, sid: str, data: dict) -> dict:
    """Handle generic skill response from frontend.
    
    Uses Redis-backed PendingRequestRegistry for cross-worker support.
    """
    from chat_shell.tools import get_pending_request_registry

    request_id = data.get("request_id")
    skill_name = data.get("skill_name")
    action = data.get("action")
    success = data.get("success", False)
    result = data.get("result")
    error = data.get("error")

    # Get registry (async to ensure Pub/Sub listener is started)
    registry = await get_pending_request_registry()

    # Build complete result object
    complete_result = {
        "success": success,
        "result": result,
        "error": error,
    }

    resolved = await registry.resolve(
        request_id=request_id,
        result=complete_result,
        error=None,
    )

    if not resolved:
        return {"error": "No pending request found"}

    return {"success": True}

4. Token Efficiency Design

4.1 On-Demand vs Static Loading Comparison

ApproachInitial TokensPer-Turn TokensUse Case
Static InjectionAll skill prompts (500-2000 tokens)SameSimple agents with few skills
On-Demand (load_skill)Summaries only (50-100 tokens)+ Prompt when neededComplex agents with many skills
Preload SkillsSelected prompts + summariesSameFrequently used skills

4.2 Token Savings Calculation

# Example: Agent with 10 skills, each with 200-token prompt

# Static injection
static_tokens = 10 * 200  # 2000 tokens always in context

# On-demand loading (using 2 skills per turn average)
demand_tokens_initial = 10 * 20  # 200 tokens for summaries
demand_tokens_per_turn = 2 * 200  # 400 tokens when loaded

# After 10 turns:
# Static: 2000 * 10 = 20000 tokens
# On-demand: 200 + (400 * 10) = 4200 tokens
# Savings: 79%

4.3 Skill Retention Strategy

# Default 5-turn retention with automatic expiration
class LoadSkillTool:
    DEFAULT_SKILL_RETENTION_TURNS = 5
    
    def restore_from_history(self, history: list[dict]) -> None:
        """Restore skill loading state from chat history.
        
        Counts conversation turns backwards and only restores
        skills within the retention window.
        """
        skill_load_turns: dict[str, int] = {}  # skill_name -> turns_ago
        
        # Count user-assistant pairs from the end
        current_turn = 0
        i = len(history) - 1
        
        while i >= 0:
            msg = history[i]
            if msg.get("role") == "assistant":
                # Check for load_skill tool calls
                loaded_skills = self._extract_loaded_skills_from_content(
                    msg.get("content", "")
                )
                for skill_name in loaded_skills:
                    if skill_name not in skill_load_turns:
                        skill_load_turns[skill_name] = current_turn
                
                i -= 1
                # Complete turn when user message precedes assistant
                if i >= 0 and history[i].get("role") == "user":
                    current_turn += 1
                    i -= 1
        
        # Restore skills still within retention window
        for skill_name, turns_ago in skill_load_turns.items():
            remaining_turns = self.skill_retention_turns - turns_ago
            if remaining_turns > 0:
                self._restore_skill(skill_name, remaining_turns)

5. Skill Package Structure

5.1 ZIP Package Layout

skill-package.zip
├── skill-name/                    # Folder matching skill name
│   ├── SKILL.md                   # Required: Metadata + prompt
│   ├── provider.py                # Optional: Tool provider
│   ├── tool1.py                   # Optional: Tool implementations
│   └── tool2.py                   # Optional: Additional tools

5.2 SKILL.md Format

---
description: "Generate and render Mermaid diagrams"
displayName: "Mermaid Diagram"
version: "1.0.0"
author: "Wegent Team"
tags: ["visualization", "diagram"]
bindShells: ["ClaudeCode", "Chat"]
provider:
  module: provider
  class: MermaidToolProvider
tools:
  - name: render_mermaid
    provider: mermaid
    config:
      timeout: 30
mcpServers:
  weather:
    type: sse
    url: https://api.weather.com/mcp
    headers:
      X-API-Key: ${{secrets.weather_api_key}}
---

# Mermaid Diagram Skill

You can use this skill to generate diagrams using Mermaid syntax.

## Usage

1. Write diagram code in Mermaid format
2. Call render_mermaid tool
3. The diagram will be rendered and displayed

## Examples

### Flowchart
```mermaid
graph TD
    A[Start] --> B{Decision}
    B -->|Yes| C[Action 1]
    B -->|No| D[Action 2]

---

## 6. Security Architecture

### 6.1 Code Execution Boundaries

```mermaid
flowchart TD
    subgraph UserSkills["User Skills (user_id > 0)"]
        A1[SKILL.md] --> A2[Prompt Content Only]
        A3[No provider.py] --> A4[Static Instructions]
    end
    
    subgraph PublicSkills["Public Skills (user_id = 0)"]
        B1[SKILL.md] --> B2[Prompt Content]
        B3[provider.py] --> B4[Dynamic Code Loading]
        B4 --> B5[SkillToolRegistry]
        B5 --> B6[Custom Tools]
    end
    
    style UserSkills fill:#ffcccc
    style PublicSkills fill:#ccffcc

6.2 Security Check Implementation

def ensure_provider_loaded(
    self,
    skill_name: str,
    provider_config: Optional[dict],
    zip_content: Optional[bytes],
    is_public: bool = False,
) -> bool:
    """Ensure a skill's provider is loaded and registered.
    
    SECURITY: Only public skills (user_id=0) are allowed to load code.
    """
    if not provider_config:
        return True

    class_name = provider_config.get("class")
    if not class_name:
        return True

    # SECURITY CHECK: Only allow code loading for public skills
    if not is_public:
        logger.warning(
            f"SECURITY: Blocked code loading for non-public "
            f"skill '{skill_name}'. Only public skills can load code."
        )
        return False

7. Integration Points

7.1 Ghost CRD Integration

class GhostSpec(BaseModel):
    """Ghost specification"""

    systemPrompt: str
    mcpServers: Optional[Dict[str, Any]] = None  # Global MCP for this Ghost
    skills: Optional[List[str]] = None  # Available skill names
    preload_skills: Optional[List[str]] = Field(
        None,
        description="List of skill names to preload into system prompt. "
        "Must be a subset of skills.",
    )

7.2 ChatConfigBuilder Integration

# backend/app/services/chat/config/chat_config.py
class ChatConfigBuilder:
    def _build_skill_configs(self, ghost_crd: Ghost) -> dict:
        """Build skill configurations for chat session."""
        skill_names = ghost_crd.spec.skills or []
        preload_skills = ghost_crd.spec.preload_skills or []
        
        skill_configs = {}
        for skill_name in skill_names:
            skill_crd = self._get_skill(skill_name)
            skill_data = {
                "description": skill_crd.spec.description,
                "displayName": skill_crd.spec.displayName,
                "prompt": skill_crd.spec.prompt,
                "tools": skill_crd.spec.tools,
                "provider": skill_crd.spec.provider,
            }
            
            # Include mcpServers if present
            if skill_crd.spec.mcpServers:
                skill_data["mcpServers"] = skill_crd.spec.mcpServers
            
            skill_configs[skill_name] = skill_data
        
        return skill_configs

8. Trade-offs and Design Decisions

8.1 On-Demand Loading Trade-offs

AdvantageDisadvantage
40-80% token savingsAdditional latency on first skill use
Cleaner initial promptsMore complex implementation
Better scalability (more skills)Requires LLM to make loading decision
Reduced context window pressureSkill state management complexity

8.2 Provider Loading Security Trade-offs

AdvantageDisadvantage
Prevents code injection from usersLimits user skill functionality
Maintains system integrityRequires admin approval for rich skills
Clear security boundaryPublic skill review burden

8.3 MCP vs Built-in Tools

AspectMCP ToolsBuilt-in Tools
DeploymentExternal processIn-process
IsolationHigh (subprocess/network)Lower (shared memory)
Startup TimeSlower (connection setup)Faster (pre-loaded)
Failure ModeGraceful degradationCan crash agent
FlexibilityProtocol-standardizedCustom implementation

9. Areas for Improvement and Extension

9.1 Potential Enhancements

  1. Skill Versioning: Add semantic versioning with automatic update notifications
  2. Skill Dependencies: Allow skills to depend on other skills
  3. Skill Marketplace: Public skill discovery and rating system
  4. Skill Testing Framework: Built-in testing tools for skill developers
  5. Skill Analytics: Usage metrics to identify popular and unused skills

9.2 MCP Improvements

  1. MCP Server Health Checks: Periodic connectivity validation
  2. MCP Connection Pooling: Reuse connections across sessions
  3. MCP Load Balancing: Support for multiple MCP server instances
  4. Custom MCP Transports: WebSocket, gRPC support

9.3 Performance Optimizations

  1. Skill Binary Caching: Cache ZIP packages in memory/redis
  2. Provider Hot-Reloading: Update providers without restart
  3. Parallel Skill Loading: Load multiple skills simultaneously
  4. Predictive Skill Loading: Pre-load likely-to-use skills

9.4 Frontend Integration

  1. Skill Status UI: Visual indicator of loaded skills
  2. Skill Suggestions: AI-powered skill recommendations
  3. Skill Documentation: Inline help for available skills
  4. Skill Composer: Visual tool for creating skills

Summary

The Wegent Skill System represents a thoughtful approach to solving the context window limitation problem in AI agents. Its key innovations include:

  1. Lazy Loading Architecture: Skills are only loaded when explicitly requested by the LLM, dramatically reducing token consumption
  2. Security-First Design: Clear boundaries between user-uploaded content (safe, prompt-only) and public skills (rich functionality with code)
  3. Flexible Provider System: Dynamic tool creation through a well-defined provider interface
  4. MCP Integration: Industry-standard protocol support for external tool integration
  5. State Management: Sophisticated turn-based retention and history restoration

The system successfully balances flexibility, security, and efficiency, making it suitable for production deployments with diverse agent capabilities. The clear separation of concerns and well-defined interfaces make it extensible for future requirements.

Unique Design Elements:

  • Session-level caching with turn-based expiration
  • Double security boundary (public/private skills × code/prompt separation)
  • Graceful degradation for MCP failures
  • Async frontend interaction through PendingRequestRegistry
  • Variable substitution in MCP configurations for dynamic values

This architecture could serve as a reference implementation for other AI agent platforms seeking to implement on-demand capability loading.