Deep Dive: Wegent Skill System & MCP Integration
Executive Summary
The Wegent Skill System is a sophisticated on-demand capability loading mechanism that enables AI agents to dynamically acquire specialized knowledge and tools at runtime. Combined with the Model Context Protocol (MCP) integration, it creates a powerful, token-efficient architecture for extending agent capabilities without bloating the context window.
Key Innovation: The system implements "lazy loading" for skills - detailed instructions are only injected into the system prompt when the LLM explicitly requests them via the load_skill() tool call, reducing token consumption by 40-80% compared to static prompt injection.
1. Skill System Architecture
1.1 CRD Definition and Storage
Skill CRD Schema (backend/app/schemas/kind.py)
class SkillSpec(BaseModel):
"""Skill specification"""
description: str # Trigger condition description
displayName: Optional[str] = None # Friendly display name
prompt: Optional[str] = None # Full prompt content from SKILL.md
version: Optional[str] = None
author: Optional[str] = None
tags: Optional[List[str]] = None
bindShells: Optional[List[str]] = Field(
None,
description="List of shell types this skill is compatible with. "
"Valid values: 'ClaudeCode', 'Agno', 'Dify', 'Chat'.",
)
config: Optional[Dict[str, Any]] = Field(
None,
description="Skill-level configuration shared by all tools.",
)
tools: Optional[List[SkillToolDeclaration]] = Field(
None,
description="Tool declarations for skill-tool binding.",
)
provider: Optional[SkillProviderConfig] = Field(
None,
description="Provider configuration for dynamic loading.",
)
mcpServers: Optional[Dict[str, Any]] = Field(
None,
description="MCP servers configuration for this skill.",
)
Database Storage Model (backend/app/models/skill_binary.py)
Skills are stored across two tables for efficient querying and binary storage:
# kinds table - stores CRD metadata (name, namespace, spec as JSON)
# skill_binaries table - stores ZIP package content
class SkillBinary(Base):
"""Skill binary storage for ZIP packages"""
__tablename__ = "skill_binaries"
id = Column(Integer, primary_key=True, autoincrement=True)
kind_id = Column(Integer, ForeignKey("kinds.id", ondelete="CASCADE"), nullable=False)
binary_data = Column(LargeBinary, nullable=False) # ZIP content
file_size = Column(Integer, nullable=False)
file_hash = Column(String(64), nullable=False) # SHA256 hash
created_at = Column(DateTime, default=datetime.utcnow)
Design Rationale: The separation allows fast metadata queries on the kinds table while keeping binary data (which can be large) in a separate table accessed only when needed.
1.2 Skill Loading Flow
1.3 LoadSkillTool Implementation (chat_shell/chat_shell/tools/builtin/load_skill.py)
The LoadSkillTool is the core mechanism enabling on-demand skill loading:
class LoadSkillTool(BaseTool):
"""Tool to load a skill and get its full prompt content.
Session-level caching with persistence:
- Skills are cached within a single conversation turn
- First call returns full prompt, subsequent calls return short confirmation
- Skills remain loaded for up to 5 conversation turns (configurable)
- Skill state is restored from chat history at the start of each turn
"""
name: str = "load_skill"
description: str = (
"Load a skill's full instructions when you need specialized guidance. "
"Call this tool when your task matches one of the available skills' descriptions. "
"Note: Within the same response, if you've already loaded a skill, calling it again "
"will confirm it's still active without repeating the full instructions."
)
# Session-level tracking
_expanded_skills: Set[str] = PrivateAttr(default_factory=set)
_loaded_skill_prompts: dict[str, str] = PrivateAttr(default_factory=dict)
_skill_remaining_turns: dict[str, int] = PrivateAttr(default_factory=dict)
def _run(self, skill_name: str) -> str:
"""Load skill and return prompt content."""
if skill_name not in self.skill_names:
return f"Error: Skill '{skill_name}' is not available."
# Check if skill was already expanded in this turn
if skill_name in self._expanded_skills:
# Reset the remaining turns counter
self._skill_remaining_turns[skill_name] = self.skill_retention_turns
return f"Skill '{skill_name}' is already active..."
# Get skill metadata
skill_info = self.skill_metadata.get(skill_name, {})
prompt = skill_info.get("prompt", "")
if not prompt:
return f"Error: Skill '{skill_name}' has no prompt content."
# Mark skill as expanded for this turn
self._expanded_skills.add(skill_name)
self._loaded_skill_prompts[skill_name] = prompt
self._skill_remaining_turns[skill_name] = self.skill_retention_turns
return f"Skill '{skill_name}' has been loaded. The instructions have been added to the system prompt."
def get_prompt_modification(self) -> str:
"""Get prompt modification content for system prompt injection.
This method implements the PromptModifierTool protocol, allowing
LangGraphAgentBuilder to automatically inject loaded skill prompts.
"""
if not self._loaded_skill_prompts:
return ""
parts = []
for skill_name, prompt in self._loaded_skill_prompts.items():
parts.append(f"\n\n## Skill: {skill_name}\n\n{prompt}")
return (
"\n\n<skill>\n# Loaded Skill Instructions\n\nThe following skills have been loaded. "
+ "".join(parts)
+ "\n</skill>"
)
Key Design Patterns:
- Session-level caching: Prevents redundant prompt injection within the same conversation turn
- Turn-based retention: Skills expire after 5 turns (configurable) to prevent context bloat
- State restoration: Skill loading state is restored from chat history on page refresh
- PromptModifierTool protocol: Enables automatic system prompt injection
1.4 Dynamic Provider Loading System
SkillToolProvider Interface (chat_shell/chat_shell/skills/provider.py)
class SkillToolProvider(ABC):
"""Abstract base class for skill tool providers.
A tool provider is responsible for creating tool instances
for a specific skill. Each provider is registered with a
unique provider name and can create one or more tools.
"""
@property
@abstractmethod
def provider_name(self) -> str:
"""Unique identifier for this provider."""
pass
@property
@abstractmethod
def supported_tools(self) -> list[str]:
"""List of tool names this provider can create."""
pass
@abstractmethod
def create_tool(
self,
tool_name: str,
context: SkillToolContext,
tool_config: Optional[dict[str, Any]] = None,
) -> BaseTool:
"""Create a tool instance."""
pass
SkillToolRegistry (chat_shell/chat_shell/skills/registry.py)
The registry implements the Service Locator pattern with thread-safe singleton access:
class SkillToolRegistry:
"""Central registry for skill tool providers.
Implements Service Locator pattern with thread-safe access.
"""
_instance: Optional["SkillToolRegistry"] = None
_instance_lock: threading.Lock = threading.Lock()
_providers: dict[str, SkillToolProvider]
_providers_lock: threading.Lock
@classmethod
def get_instance(cls) -> "SkillToolRegistry":
"""Get singleton instance with double-checked locking."""
if cls._instance is None:
with cls._instance_lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def load_provider_from_zip(
self,
zip_content: bytes,
provider_config: dict[str, Any],
skill_name: str,
) -> Optional[SkillToolProvider]:
"""Dynamically load a provider from a skill package.
This method extracts all Python modules from the ZIP package
and dynamically loads them as a package.
"""
# Create unique package name for this skill
package_name = f"skill_pkg_{skill_name.replace('-', '_')}"
with zipfile.ZipFile(io.BytesIO(zip_content), "r") as zip_file:
# Load all Python modules
for py_mod_name, file_path in python_files.items():
full_module_name = f"{package_name}.{py_mod_name}"
module_code = zip_file.read(file_path).decode("utf-8")
# Create module spec and execute
spec = importlib.util.spec_from_loader(
full_module_name,
loader=None,
origin=f"skill://{skill_name}/{py_mod_name}.py",
)
module = importlib.util.module_from_spec(spec)
exec(module_code, module.__dict__)
# Instantiate provider class
provider_class = getattr(provider_module, class_name)
return provider_class()
Security Boundary: Only public skills (user_id=0) can load dynamic code. User-uploaded skills are restricted to prompt content only, preventing arbitrary code execution from untrusted sources.
2. MCP (Model Context Protocol) Integration
2.1 MCP Architecture Overview
MCP is an open protocol standardizing how AI agents connect to external data sources and tools. Wegent supports three transport types:
| Transport | Use Case | Connection Mode |
|---|---|---|
stdio | Local CLI tools | Subprocess with stdin/stdout |
sse | Remote HTTP servers | Server-Sent Events stream |
streamable-http | HTTP-based MCP | HTTP streaming |
2.2 MCP Client Implementation (chat_shell/chat_shell/tools/mcp/client.py)
class MCPClient:
"""MCP client with async context manager support.
Wraps langchain-mcp-adapters MultiServerMCPClient with:
- Connection timeout protection (30s default)
- Tool wrapping with timeout and exception handling
- Graceful degradation (failed tools return errors, don't crash)
"""
def __init__(
self,
config: dict[str, dict[str, Any]],
task_data: dict[str, Any] | None = None
):
self.config = config
self.task_data = task_data
self.connections = build_connections(config, task_data)
self._client: MultiServerMCPClient | None = None
self._tools: list[BaseTool] = []
async def connect(self) -> None:
"""Connect to all configured MCP servers.
Fault-tolerant: If some servers fail, tools from successful
connections will still be available.
"""
self._client = MultiServerMCPClient(connections=self.connections)
# Load tools from each server individually for graceful failure
for server_name in self.connections.keys():
try:
tools = await self._client.get_tools(server_name=server_name)
successful_servers.append(server_name)
raw_tools.extend(tools)
except Exception as e:
failed_servers.append(server_name)
logger.warning(f"Failed to load tools from server '{server_name}': {e}")
# Wrap all tools with protection mechanisms
self._tools = [wrap_tool_with_protection(tool) for tool in raw_tools]
2.3 Tool Protection Mechanisms
def wrap_tool_with_protection(
tool: BaseTool, timeout: float = DEFAULT_TOOL_TIMEOUT
) -> BaseTool:
"""Wrap an MCP tool with timeout and exception protection.
Ensures:
- Tool execution has a timeout limit
- Exceptions don't crash the chat service
- Failed tools return error messages instead of raising exceptions
"""
original_run = tool._run if hasattr(tool, "_run") else None
original_arun = tool._arun if hasattr(tool, "_arun") else None
def protected_run(*args, **kwargs):
"""Synchronous tool execution with protection."""
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(original_run, *args, **kwargs)
try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
return f"MCP tool '{tool.name}' timed out after {timeout}s"
except Exception as e:
logger.exception(f"MCP tool '{tool.name}' failed: {e}")
return f"MCP tool '{tool.name}' failed: {e!s}"
async def protected_arun(*args, **kwargs):
"""Asynchronous tool execution with timeout."""
try:
result = await asyncio.wait_for(
original_arun(*args, **kwargs), timeout=timeout
)
return result
except asyncio.TimeoutError:
return f"MCP tool '{tool.name}' timed out after {timeout}s"
except Exception as e:
return f"MCP tool '{tool.name}' failed: {e!s}"
tool._run = protected_run
tool._arun = protected_arun
return tool
2.4 MCP Configuration Hierarchy
Configuration Precedence:
- Skill-level MCP servers (highest priority)
- Bot/Ghost MCP servers
- Backend global MCP servers (lowest priority)
2.5 MCP Tool Call Sequence
3. Frontend Skill Interaction (PendingRequestRegistry)
3.1 Overview
The PendingRequestRegistry enables skills that require frontend rendering/processing to communicate asynchronously with the frontend UI:
3.2 PendingRequestRegistry Implementation (chat_shell/chat_shell/tools/pending_requests.py)
@dataclass
class PendingRequest:
"""Represents a pending skill request waiting for frontend response."""
request_id: str
skill_name: str
action: str
payload: Dict[str, Any]
future: asyncio.Future
created_at: datetime = field(default_factory=datetime.utcnow)
timeout_seconds: float = 30.0
class PendingRequestRegistry:
"""Registry for managing pending skill requests.
Uses local asyncio.Future for in-process waiting.
"""
def __init__(self):
self._local_requests: Dict[str, PendingRequest] = {}
self._lock = asyncio.Lock()
async def register(
self,
request_id: str,
skill_name: str,
action: str,
payload: Dict[str, Any],
timeout_seconds: float = 30.0,
) -> asyncio.Future:
"""Register a new pending request and return a future to await."""
loop = asyncio.get_event_loop()
future = loop.create_future()
request = PendingRequest(
request_id=request_id,
skill_name=skill_name,
action=action,
payload=payload,
future=future,
timeout_seconds=timeout_seconds,
)
async with self._lock:
self._local_requests[request_id] = request
return future
async def resolve(
self,
request_id: str,
result: Any,
error: Optional[str] = None,
) -> bool:
"""Resolve a pending request with result or error."""
async with self._lock:
request = self._local_requests.pop(request_id, None)
if not request or request.future.done():
return False
response = {
"success": error is None,
"result": result,
"error": error,
}
request.future.set_result(response)
return True
3.3 Backend WebSocket Handler (backend/app/api/ws/chat_namespace.py)
async def on_skill_response(self, sid: str, data: dict) -> dict:
"""Handle generic skill response from frontend.
Uses Redis-backed PendingRequestRegistry for cross-worker support.
"""
from chat_shell.tools import get_pending_request_registry
request_id = data.get("request_id")
skill_name = data.get("skill_name")
action = data.get("action")
success = data.get("success", False)
result = data.get("result")
error = data.get("error")
# Get registry (async to ensure Pub/Sub listener is started)
registry = await get_pending_request_registry()
# Build complete result object
complete_result = {
"success": success,
"result": result,
"error": error,
}
resolved = await registry.resolve(
request_id=request_id,
result=complete_result,
error=None,
)
if not resolved:
return {"error": "No pending request found"}
return {"success": True}
4. Token Efficiency Design
4.1 On-Demand vs Static Loading Comparison
| Approach | Initial Tokens | Per-Turn Tokens | Use Case |
|---|---|---|---|
| Static Injection | All skill prompts (500-2000 tokens) | Same | Simple agents with few skills |
| On-Demand (load_skill) | Summaries only (50-100 tokens) | + Prompt when needed | Complex agents with many skills |
| Preload Skills | Selected prompts + summaries | Same | Frequently used skills |
4.2 Token Savings Calculation
# Example: Agent with 10 skills, each with 200-token prompt
# Static injection
static_tokens = 10 * 200 # 2000 tokens always in context
# On-demand loading (using 2 skills per turn average)
demand_tokens_initial = 10 * 20 # 200 tokens for summaries
demand_tokens_per_turn = 2 * 200 # 400 tokens when loaded
# After 10 turns:
# Static: 2000 * 10 = 20000 tokens
# On-demand: 200 + (400 * 10) = 4200 tokens
# Savings: 79%
4.3 Skill Retention Strategy
# Default 5-turn retention with automatic expiration
class LoadSkillTool:
DEFAULT_SKILL_RETENTION_TURNS = 5
def restore_from_history(self, history: list[dict]) -> None:
"""Restore skill loading state from chat history.
Counts conversation turns backwards and only restores
skills within the retention window.
"""
skill_load_turns: dict[str, int] = {} # skill_name -> turns_ago
# Count user-assistant pairs from the end
current_turn = 0
i = len(history) - 1
while i >= 0:
msg = history[i]
if msg.get("role") == "assistant":
# Check for load_skill tool calls
loaded_skills = self._extract_loaded_skills_from_content(
msg.get("content", "")
)
for skill_name in loaded_skills:
if skill_name not in skill_load_turns:
skill_load_turns[skill_name] = current_turn
i -= 1
# Complete turn when user message precedes assistant
if i >= 0 and history[i].get("role") == "user":
current_turn += 1
i -= 1
# Restore skills still within retention window
for skill_name, turns_ago in skill_load_turns.items():
remaining_turns = self.skill_retention_turns - turns_ago
if remaining_turns > 0:
self._restore_skill(skill_name, remaining_turns)
5. Skill Package Structure
5.1 ZIP Package Layout
skill-package.zip
├── skill-name/ # Folder matching skill name
│ ├── SKILL.md # Required: Metadata + prompt
│ ├── provider.py # Optional: Tool provider
│ ├── tool1.py # Optional: Tool implementations
│ └── tool2.py # Optional: Additional tools
5.2 SKILL.md Format
---
description: "Generate and render Mermaid diagrams"
displayName: "Mermaid Diagram"
version: "1.0.0"
author: "Wegent Team"
tags: ["visualization", "diagram"]
bindShells: ["ClaudeCode", "Chat"]
provider:
module: provider
class: MermaidToolProvider
tools:
- name: render_mermaid
provider: mermaid
config:
timeout: 30
mcpServers:
weather:
type: sse
url: https://api.weather.com/mcp
headers:
X-API-Key: ${{secrets.weather_api_key}}
---
# Mermaid Diagram Skill
You can use this skill to generate diagrams using Mermaid syntax.
## Usage
1. Write diagram code in Mermaid format
2. Call render_mermaid tool
3. The diagram will be rendered and displayed
## Examples
### Flowchart
```mermaid
graph TD
A[Start] --> B{Decision}
B -->|Yes| C[Action 1]
B -->|No| D[Action 2]
---
## 6. Security Architecture
### 6.1 Code Execution Boundaries
```mermaid
flowchart TD
subgraph UserSkills["User Skills (user_id > 0)"]
A1[SKILL.md] --> A2[Prompt Content Only]
A3[No provider.py] --> A4[Static Instructions]
end
subgraph PublicSkills["Public Skills (user_id = 0)"]
B1[SKILL.md] --> B2[Prompt Content]
B3[provider.py] --> B4[Dynamic Code Loading]
B4 --> B5[SkillToolRegistry]
B5 --> B6[Custom Tools]
end
style UserSkills fill:#ffcccc
style PublicSkills fill:#ccffcc
6.2 Security Check Implementation
def ensure_provider_loaded(
self,
skill_name: str,
provider_config: Optional[dict],
zip_content: Optional[bytes],
is_public: bool = False,
) -> bool:
"""Ensure a skill's provider is loaded and registered.
SECURITY: Only public skills (user_id=0) are allowed to load code.
"""
if not provider_config:
return True
class_name = provider_config.get("class")
if not class_name:
return True
# SECURITY CHECK: Only allow code loading for public skills
if not is_public:
logger.warning(
f"SECURITY: Blocked code loading for non-public "
f"skill '{skill_name}'. Only public skills can load code."
)
return False
7. Integration Points
7.1 Ghost CRD Integration
class GhostSpec(BaseModel):
"""Ghost specification"""
systemPrompt: str
mcpServers: Optional[Dict[str, Any]] = None # Global MCP for this Ghost
skills: Optional[List[str]] = None # Available skill names
preload_skills: Optional[List[str]] = Field(
None,
description="List of skill names to preload into system prompt. "
"Must be a subset of skills.",
)
7.2 ChatConfigBuilder Integration
# backend/app/services/chat/config/chat_config.py
class ChatConfigBuilder:
def _build_skill_configs(self, ghost_crd: Ghost) -> dict:
"""Build skill configurations for chat session."""
skill_names = ghost_crd.spec.skills or []
preload_skills = ghost_crd.spec.preload_skills or []
skill_configs = {}
for skill_name in skill_names:
skill_crd = self._get_skill(skill_name)
skill_data = {
"description": skill_crd.spec.description,
"displayName": skill_crd.spec.displayName,
"prompt": skill_crd.spec.prompt,
"tools": skill_crd.spec.tools,
"provider": skill_crd.spec.provider,
}
# Include mcpServers if present
if skill_crd.spec.mcpServers:
skill_data["mcpServers"] = skill_crd.spec.mcpServers
skill_configs[skill_name] = skill_data
return skill_configs
8. Trade-offs and Design Decisions
8.1 On-Demand Loading Trade-offs
| Advantage | Disadvantage |
|---|---|
| 40-80% token savings | Additional latency on first skill use |
| Cleaner initial prompts | More complex implementation |
| Better scalability (more skills) | Requires LLM to make loading decision |
| Reduced context window pressure | Skill state management complexity |
8.2 Provider Loading Security Trade-offs
| Advantage | Disadvantage |
|---|---|
| Prevents code injection from users | Limits user skill functionality |
| Maintains system integrity | Requires admin approval for rich skills |
| Clear security boundary | Public skill review burden |
8.3 MCP vs Built-in Tools
| Aspect | MCP Tools | Built-in Tools |
|---|---|---|
| Deployment | External process | In-process |
| Isolation | High (subprocess/network) | Lower (shared memory) |
| Startup Time | Slower (connection setup) | Faster (pre-loaded) |
| Failure Mode | Graceful degradation | Can crash agent |
| Flexibility | Protocol-standardized | Custom implementation |
9. Areas for Improvement and Extension
9.1 Potential Enhancements
- Skill Versioning: Add semantic versioning with automatic update notifications
- Skill Dependencies: Allow skills to depend on other skills
- Skill Marketplace: Public skill discovery and rating system
- Skill Testing Framework: Built-in testing tools for skill developers
- Skill Analytics: Usage metrics to identify popular and unused skills
9.2 MCP Improvements
- MCP Server Health Checks: Periodic connectivity validation
- MCP Connection Pooling: Reuse connections across sessions
- MCP Load Balancing: Support for multiple MCP server instances
- Custom MCP Transports: WebSocket, gRPC support
9.3 Performance Optimizations
- Skill Binary Caching: Cache ZIP packages in memory/redis
- Provider Hot-Reloading: Update providers without restart
- Parallel Skill Loading: Load multiple skills simultaneously
- Predictive Skill Loading: Pre-load likely-to-use skills
9.4 Frontend Integration
- Skill Status UI: Visual indicator of loaded skills
- Skill Suggestions: AI-powered skill recommendations
- Skill Documentation: Inline help for available skills
- Skill Composer: Visual tool for creating skills
Summary
The Wegent Skill System represents a thoughtful approach to solving the context window limitation problem in AI agents. Its key innovations include:
- Lazy Loading Architecture: Skills are only loaded when explicitly requested by the LLM, dramatically reducing token consumption
- Security-First Design: Clear boundaries between user-uploaded content (safe, prompt-only) and public skills (rich functionality with code)
- Flexible Provider System: Dynamic tool creation through a well-defined provider interface
- MCP Integration: Industry-standard protocol support for external tool integration
- State Management: Sophisticated turn-based retention and history restoration
The system successfully balances flexibility, security, and efficiency, making it suitable for production deployments with diverse agent capabilities. The clear separation of concerns and well-defined interfaces make it extensible for future requirements.
Unique Design Elements:
- Session-level caching with turn-based expiration
- Double security boundary (public/private skills × code/prompt separation)
- Graceful degradation for MCP failures
- Async frontend interaction through PendingRequestRegistry
- Variable substitution in MCP configurations for dynamic values
This architecture could serve as a reference implementation for other AI agent platforms seeking to implement on-demand capability loading.