Skip to main content

Installation

pip install fastskill
The Python SDK requires Python 3.8+ and provides both synchronous and asynchronous interfaces.

FastSkillService

The main service class that provides access to all FastSkill functionality.

Constructor

from fastskill import FastSkillService, ServiceConfig

# Default configuration
service = FastSkillService()

# With custom configuration
config = ServiceConfig(
    skill_storage_path="./skills",
    execution_timeout=30,
    enable_hot_reload=True
)
service = FastSkillService(config)

# From environment variables
service = FastSkillService.from_env()

# From configuration file
service = FastSkillService.from_config_file("config.json")

Lifecycle Methods

initialize()
async method
Initialize the service and load all components. Must be called before using other methods.
shutdown()
async method
Gracefully shut down the service and clean up resources. Should be called when done.
restart()
async method
Restart the service with the same configuration. Useful for hot reloading scenarios.

Skill Management Methods

register_skill(skill_definition)
async method
Register a new skill with the service.Parameters:
  • skill_definition (dict): Skill definition following the FastSkill schema
Returns: str - The registered skill IDExample:
skill_id = await service.register_skill({
    "id": "text-processor",
    "name": "Text Processor",
    "description": "Process text documents",
    "version": "1.0.0",
    "tags": ["text", "processing"],
    "capabilities": ["text_processing"]
})
update_skill(skill_id, updates)
async method
Update an existing skill.Parameters:
  • skill_id (str): ID of the skill to update
  • updates (dict): Fields to update
Returns: bool - Success status
delete_skill(skill_id)
async method
Delete a skill from the service.Parameters:
  • skill_id (str): ID of the skill to delete
Returns: bool - Success status
get_skill(skill_id)
async method
Get detailed information about a specific skill.Parameters:
  • skill_id (str): ID of the skill
Returns: dict - Skill definition or None if not found
list_skills(filters=None, sort=None, page=None, page_size=None)
async method
List all registered skills with optional filtering and sorting.Parameters:
  • filters (dict): Filter criteria
  • sort (dict): Sort options
  • page (int): Page number (1-based)
  • page_size (int): Number of items per page
Returns: dict - Paginated results with data and metadata
enable_skill(skill_id)
async method
Enable a disabled skill.Parameters:
  • skill_id (str): ID of the skill to enable
Returns: bool - Success status
disable_skill(skill_id)
async method
Disable an active skill.Parameters:
  • skill_id (str): ID of the skill to disable
Returns: bool - Success status

Discovery Methods

discover_skills(query, limit=None)
async method
Discover skills relevant to a query using natural language processing.Parameters:
  • query (str): Search query
  • limit (int): Maximum number of results
Returns: list - List of relevant skills
find_skills_by_capability(capability, limit=None)
async method
Find skills that provide a specific capability.Parameters:
  • capability (str): Capability to search for
  • limit (int): Maximum number of results
Returns: list - List of matching skills
find_skills_by_tag(tag, limit=None)
async method
Find skills with a specific tag.Parameters:
  • tag (str): Tag to search for
  • limit (int): Maximum number of results
Returns: list - List of matching skills
find_skills_by_author(author, limit=None)
async method
Find skills by a specific author.Parameters:
  • author (str): Author name to search for
  • limit (int): Maximum number of results
Returns: list - List of matching skills

Tool Service

Access tool-related functionality through the tool service.

Tool Discovery

get_available_tools(filters=None)
async method
Get all available tools with optional filtering.Parameters:
  • filters (dict): Filter criteria for tools
Returns: list - List of available tools
get_tool_details(tool_id)
async method
Get detailed information about a specific tool.Parameters:
  • tool_id (str): ID of the tool
Returns: dict - Tool details including parameters and capabilities
find_tools_by_capability(capability)
async method
Find tools that provide a specific capability.Parameters:
  • capability (str): Capability to search for
Returns: list - List of matching tools
find_tools_by_tag(tag)
async method
Find tools with a specific tag.Parameters:
  • tag (str): Tag to search for
Returns: list - List of matching tools

Tool Execution

execute_tool(skill_id, tool_name, parameters, execution_options=None)
async method
Execute a specific tool.Parameters:
  • skill_id (str): ID of the skill containing the tool
  • tool_name (str): Name of the tool to execute
  • parameters (dict): Parameters for the tool execution
  • execution_options (dict): Additional execution options
Returns: dict - Execution results and metadata
execute_batch(operations, concurrent=True)
async method
Execute multiple tools in batch.Parameters:
  • operations (list): List of tool operations to execute
  • concurrent (bool): Whether to execute operations concurrently
Returns: list - List of execution results
validate_tool_parameters(skill_id, tool_name, parameters)
async method
Validate parameters for a tool before execution.Parameters:
  • skill_id (str): ID of the skill
  • tool_name (str): Name of the tool
  • parameters (dict): Parameters to validate
Returns: dict - Validation results

Routing Service

Access intelligent routing functionality.
find_relevant_skills(query, context=None, limit=None)
async method
Find the most relevant skills for a query using intelligent routing.Parameters:
  • query (str): Search query
  • context (dict): Additional context for routing
  • limit (int): Maximum number of results
Returns: list - Ranked list of relevant skills
find_relevant_skills_with_context(query, context, limit=None)
async method
Find relevant skills with detailed context information.Parameters:
  • query (str): Search query
  • context (dict): Execution context and constraints
  • limit (int): Maximum number of results
Returns: list - Skills with relevance scores and metadata
submit_feedback(feedback_data)
async method
Submit feedback on routing decisions for learning.Parameters:
  • feedback_data (dict): Feedback information including query, selected skill, and results
Returns: bool - Success status
get_routing_metrics()
async method
Get routing performance metrics and analytics.Returns: dict - Routing performance data

Loading Service

Access progressive loading functionality.
load_metadata(skill_ids)
async method
Load metadata for specific skills.Parameters:
  • skill_ids (list): List of skill IDs to load metadata for
Returns: list - Skill metadata
load_skill_content(skill_ids)
async method
Load full content for specific skills.Parameters:
  • skill_ids (list): List of skill IDs to load content for
Returns: list - Full skill content
preload_skills(skill_ids, content_level='metadata')
async method
Preload skills at a specific content level.Parameters:
  • skill_ids (list): List of skill IDs to preload
  • content_level (str): Level of content to preload (‘metadata’ or ‘full’)
Returns: bool - Success status
get_cache_stats()
async method
Get cache performance statistics.Returns: dict - Cache performance metrics
clear_cache(cache_type='all')
async method
Clear the loading cache.Parameters:
  • cache_type (str): Type of cache to clear (‘metadata’, ‘content’, or ‘all’)
Returns: bool - Success status

Event System

Subscribe to and publish events.
subscribe(event_pattern, handler)
async method
Subscribe to events matching a pattern.Parameters:
  • event_pattern (str): Event pattern to match (supports wildcards)
  • handler (callable): Function to handle matching events
Returns: str - Subscription ID
unsubscribe(subscription_id)
async method
Unsubscribe from events.Parameters:
  • subscription_id (str): ID of the subscription to cancel
Returns: bool - Success status
publish(event_type, event_data)
async method
Publish an event to all subscribers.Parameters:
  • event_type (str): Type of event
  • event_data (dict): Event data payload
Returns: int - Number of subscribers notified
get_event_history(event_type=None, limit=None)
async method
Get event history.Parameters:
  • event_type (str): Type of events to retrieve
  • limit (int): Maximum number of events to return
Returns: list - List of historical events

Configuration Classes

ServiceConfig

Main configuration class for the FastSkill service.
ServiceConfig.skill_storage_path
str | Path
Path to the skill storage directory.
ServiceConfig.execution_timeout
int
Default timeout for skill execution in seconds.
ServiceConfig.max_memory_mb
int
Maximum memory per skill execution in MB.
ServiceConfig.enable_hot_reload
bool
Enable automatic reloading of skills when files change.
ServiceConfig.enable_audit_logging
bool
Enable detailed audit logging.
ServiceConfig.log_level
str
Logging level (ERROR, WARN, INFO, DEBUG, TRACE).
ServiceConfig.cache_size
int
Size of the metadata cache.
ServiceConfig.enable_metrics
bool
Enable performance metrics collection.

ExecutionConfig

Configuration for skill execution.
ExecutionConfig.default_timeout
int
Default timeout for skill execution.
ExecutionConfig.max_memory_mb
int
Maximum memory per execution.
ExecutionConfig.max_cpu_percent
int
Maximum CPU usage percentage.
ExecutionConfig.enable_networking
bool
Allow network access in skill execution.
ExecutionConfig.sandbox_level
str
Sandboxing level (none, basic, strict).

CacheConfig

Configuration for caching behavior.
CacheConfig.metadata_cache_size
int
Number of skill metadata entries to cache.
CacheConfig.content_cache_size
int
Number of full skill content entries to cache.
CacheConfig.cache_ttl_seconds
int
Cache entry time-to-live in seconds.
CacheConfig.enable_persistence
bool
Persist cache to disk.

Error Handling

Exception Types

FastSkillError
exception
Base exception class for all FastSkill errors.
ValidationError
exception
Raised when skill definitions or parameters are invalid.
ExecutionError
exception
Raised when skill execution fails.
ServiceError
exception
Raised when service-level operations fail.
DiscoveryError
exception
Raised when skill discovery operations fail.
ConfigurationError
exception
Raised when configuration is invalid.

Error Handling Patterns

try:
    skills = await service.discover_skills("text processing")
    print(f"Found {len(skills)} skills")
except ValidationError as e:
    print(f"Invalid query: {e}")
    # Handle validation errors (user input issues)
except ExecutionError as e:
    print(f"Execution failed: {e}")
    # Handle execution errors (runtime issues)
except ServiceError as e:
    print(f"Service error: {e}")
    # Handle service errors (system issues)
except FastSkillError as e:
    print(f"FastSkill error: {e}")
    # Handle any FastSkill-related error

Advanced Usage

Custom Service Configuration

# Advanced configuration with all options
from fastskill import (
    ServiceConfig, ExecutionConfig, CacheConfig,
    RoutingConfig, SearchConfig
)

config = ServiceConfig(
    # Storage
    skill_storage_path="./skills",
    backup_storage_path="./backups",

    # Execution
    execution=ExecutionConfig(
        default_timeout=60,
        max_memory_mb=1024,
        max_cpu_percent=75,
        enable_networking=False,
        sandbox_level="strict",
        allowed_paths=["./data", "./temp"]
    ),

    # Caching
    cache=CacheConfig(
        metadata_cache_size=2000,
        content_cache_size=200,
        cache_ttl_seconds=3600,
        enable_persistence=True,
        cache_directory="./cache"
    ),

    # Service settings
    enable_hot_reload=True,
    watch_paths=["./skills", "./custom-skills"],
    enable_audit_logging=True,
    log_level="INFO",
    max_concurrent_executions=20,

    # Performance
    enable_metrics=True,
    metrics_retention_hours=48,
    enable_health_checks=True,

    # Search and routing
    search=SearchConfig(
        max_results=50,
        min_relevance_score=0.3,
        enable_fuzzy_matching=True
    ),

    routing=RoutingConfig(
        enable_learning=True,
        learning_rate=0.1,
        feedback_weight=0.2
    )
)

Service Health Monitoring

async def monitor_service_health():
    """Monitor service health and performance."""

    # Get service metrics
    metrics = await service.get_metrics()

    print("📊 Service Health:")
    print(f"   Uptime: {metrics['uptime_seconds']}s")
    print(f"   Total requests: {metrics['total_requests']}")
    print(f"   Success rate: {metrics['success_rate']:.1%}")
    print(f"   Average response time: {metrics['avg_response_time_ms']:.2f}ms")

    # Check component health
    health = await service.health_check()

    print("\n🔍 Component Health:")
    for component, status in health.items():
        print(f"   {component}: {'✅' if status['healthy'] else '❌'}")

    # Cache performance
    cache_stats = await service.loading_service.get_cache_stats()
    print("
💾 Cache Performance:"    print(f"   Hit rate: {cache_stats['hit_rate']:.1%}")
    print(f"   Memory usage: {cache_stats['memory_usage_mb']:.1f}MB")

Event-Driven Architecture

async def event_driven_example():
    """Example of event-driven integration."""

    # Define event handlers
    async def on_skill_registered(event):
        print(f"🆕 Skill registered: {event['skill']['name']}")
        # Trigger downstream processes
        await update_skill_index(event['skill'])
        await notify_subscribers(event['skill'])

    async def on_skill_executed(event):
        print(f"⚡ Skill executed: {event['skill_id']}")
        # Log execution metrics
        await log_execution_metrics(event)

    async def on_discovery_performed(event):
        print(f"🔍 Discovery performed: '{event['query']}'")
        # Update search analytics
        await update_search_analytics(event)

    # Subscribe to events
    service.subscribe("skill.registered", on_skill_registered)
    service.subscribe("skill.executed", on_skill_executed)
    service.subscribe("discovery.performed", on_discovery_performed)

    # Events are now handled automatically
    await service.initialize()

    # Simulate some operations that will trigger events
    await service.register_skill(sample_skill)
    await service.discover_skills("text processing")

    # Cleanup
    await service.shutdown()

Performance Optimization

Connection Pooling

# For high-throughput applications
from fastskill import ServiceConfig

config = ServiceConfig(
    # Increase concurrent execution limits
    max_concurrent_executions=50,

    # Optimize caching
    cache=CacheConfig(
        metadata_cache_size=5000,
        content_cache_size=1000,
        enable_persistence=True
    ),

    # Enable performance metrics
    enable_metrics=True
)

# Reuse service instances
service = FastSkillService(config)
await service.initialize()

# Reuse for multiple operations
for query in queries:
    skills = await service.discover_skills(query)

Async Best Practices

# Efficient async patterns
async def efficient_skill_discovery():
    """Efficiently discover skills for multiple queries."""

    # Good: Batch similar operations
    queries = ["text processing", "data analysis", "file conversion"]

    # Execute discovery operations concurrently
    tasks = [service.discover_skills(query) for query in queries]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Process results
    for query, result in zip(queries, results):
        if isinstance(result, Exception):
            print(f"❌ Failed for '{query}': {result}")
        else:
            print(f"✅ Found {len(result)} skills for '{query}'")

# Good: Use connection pooling
class SkillDiscoveryService:
    """Service wrapper with connection pooling."""

    def __init__(self):
        self._service = None

    async def __aenter__(self):
        if self._service is None:
            self._service = FastSkillService()
            await self._service.initialize()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self._service:
            await self._service.shutdown()

    async def discover(self, query):
        return await self._service.discover_skills(query)

# Usage
async with SkillDiscoveryService() as discovery:
    skills = await discovery.discover("text processing")

Testing with SDK

Unit Testing

import pytest
import asyncio
from unittest.mock import AsyncMock, patch
from fastskill import FastSkillService, ValidationError

@pytest.fixture
async def service():
    """Create a test service instance."""
    config = ServiceConfig(skill_storage_path="./test-skills")
    service = FastSkillService(config)
    await service.initialize()
    yield service
    await service.shutdown()

@pytest.mark.asyncio
async def test_skill_registration(service):
    """Test skill registration functionality."""

    skill_definition = {
        "id": "test-skill",
        "name": "Test Skill",
        "description": "A skill for testing",
        "version": "1.0.0",
        "tags": ["test"],
        "capabilities": ["testing"]
    }

    # Test successful registration
    skill_id = await service.register_skill(skill_definition)
    assert skill_id == "test-skill"

    # Test duplicate registration fails
    with pytest.raises(ValidationError):
        await service.register_skill(skill_definition)

@pytest.mark.asyncio
async def test_skill_discovery(service):
    """Test skill discovery functionality."""

    # Register test skills
    await service.register_skill({
        "id": "text-processor",
        "name": "Text Processor",
        "description": "Process text documents",
        "capabilities": ["text_processing"]
    })

    # Test discovery
    skills = await service.discover_skills("process text")
    assert len(skills) > 0
    assert skills[0]["id"] == "text-processor"

@pytest.mark.asyncio
async def test_tool_execution(service):
    """Test tool execution functionality."""

    # Register skill with tool
    await service.register_skill({
        "id": "calculator",
        "name": "Calculator",
        "description": "Basic calculator",
        "capabilities": ["math"],
        "tools": [{
            "name": "add",
            "description": "Add two numbers",
            "parameters": {
                "a": {"type": "number"},
                "b": {"type": "number"}
            }
        }]
    })

    # Test tool execution
    result = await service.tool_service.execute_tool(
        "calculator",
        "add",
        {"a": 5, "b": 3}
    )

    assert result["success"] == True
    assert result["data"] == 8

Integration Testing

import pytest
import asyncio
from fastskill import FastSkillService
from fastskill.testing import TestEnvironment

@pytest.mark.asyncio
async def test_full_integration():
    """Test full integration with realistic scenarios."""

    # Setup test environment
    async with TestEnvironment() as env:
        service = env.service

        # Test complete workflow
        skill_id = await service.register_skill(sample_skill)

        # Discovery workflow
        skills = await service.discover_skills("process data")
        assert len(skills) > 0

        # Execution workflow
        result = await service.tool_service.execute_tool(
            skill_id, "process", {"data": test_data}
        )
        assert result["success"] == True

        # Cleanup
        await service.delete_skill(skill_id)
Always call await service.shutdown() in your tests to properly clean up resources and avoid resource leaks.