Skip to main content

Overview

Tool execution is the process of running skill-defined operations with proper parameter validation, security sandboxing, and error handling. FastSkill provides a robust execution environment for safe and reliable tool operation.
Tool execution happens within a secure sandbox with configurable resource limits, network controls, and comprehensive error handling.

Execution Environment

Security Sandbox

FastSkill executes tools in an isolated environment:
# Configure execution sandbox
from fastskill import ExecutionConfig

config = ServiceConfig(
    execution=ExecutionConfig(
        sandbox_level="strict",           # Maximum isolation
        max_memory_mb=512,               # Memory limit
        max_cpu_percent=50,              # CPU limit
        default_timeout=30,              # Execution timeout
        enable_networking=False,         # No network access
        allowed_paths=["./data", "./temp"],  # Limited file access
        enable_subprocess=False,         # No subprocess execution
        enable_file_writing=True         # Controlled file writing
    )
)

Resource Management

Monitor and control resource usage:
# Resource monitoring during execution
async def execute_with_monitoring(skill_id: str, tool_name: str, parameters: dict):
    """Execute tool with resource monitoring."""

    # Start monitoring
    monitor = ResourceMonitor()
    monitor.start()

    try:
        # Execute the tool
        result = await service.tool_service.execute_tool(
            skill_id, tool_name, parameters
        )

        # Get resource usage
        usage = monitor.stop()

        print(f"📊 Resource Usage:")
        print(f"   CPU: {usage['cpu_percent']:.1f}%")
        print(f"   Memory: {usage['memory_mb']:.1f} MB")
        print(f"   Execution time: {usage['execution_time']:.2f}s")
        print(f"   Network I/O: {usage['network_bytes']} bytes")

        return result

    except ResourceLimitError as e:
        print(f"⚠️ Resource limit exceeded: {e}")
        # Handle resource limit violations
        raise
    except Exception as e:
        print(f"❌ Execution failed: {e}")
        raise

Parameter Handling

Parameter Validation

Validate parameters before execution:
async def validate_and_execute(skill_id: str, tool_name: str, parameters: dict):
    """Validate parameters and execute tool safely."""

    # Get tool specification
    tool_spec = await service.tool_service.get_tool_details(
        f"{skill_id}.{tool_name}"
    )

    # Validate parameters
    validation_result = await service.tool_service.validate_parameters(
        skill_id, tool_name, parameters
    )

    if not validation_result['valid']:
        print("❌ Parameter validation failed:")
        for error in validation_result['errors']:
            print(f"   {error['parameter']}: {error['message']}")
        return None

    # Execute with validated parameters
    result = await service.tool_service.execute_tool(
        skill_id, tool_name, parameters
    )

    return result

Parameter Type Conversion

Handle parameter type conversion automatically:
def convert_parameters(parameters: dict, tool_spec: dict) -> dict:
    """Convert parameters to correct types."""

    converted = {}

    for param_name, param_spec in tool_spec['parameters'].items():
        if param_name in parameters:
            value = parameters[param_name]
            expected_type = param_spec['type']

            # Convert type
            if expected_type == 'integer' and isinstance(value, str):
                converted[param_name] = int(value)
            elif expected_type == 'number' and isinstance(value, str):
                converted[param_name] = float(value)
            elif expected_type == 'boolean' and isinstance(value, str):
                converted[param_name] = value.lower() in ['true', '1', 'yes']
            elif expected_type == 'array' and isinstance(value, str):
                converted[param_name] = value.split(',')
            else:
                converted[param_name] = value

    return converted

# Example usage
parameters = {
    "file_path": "/path/to/file.pdf",
    "page_range": "1,2,3",        # String
    "preserve_layout": "true",    # String
    "timeout": "30"               # String
}

tool_spec = {
    "parameters": {
        "page_range": {"type": "array"},
        "preserve_layout": {"type": "boolean"},
        "timeout": {"type": "integer"}
    }
}

converted = convert_parameters(parameters, tool_spec)
print(f"Converted parameters: {converted}")

Execution Modes

Synchronous Execution

# Synchronous execution (blocking)
def sync_execution_example():
    """Example of synchronous tool execution."""

    service = FastSkillService(config)
    service.initialize()

    try:
        # Synchronous tool execution
        result = service.tool_service.execute_tool_sync(
            skill_id="text-extractor",
            tool_name="extract_text",
            parameters={"file_path": "document.pdf"}
        )

        print(f"✅ Execution result: {result['success']}")
        print(f"📄 Extracted text: {result['data'][:200]}...")

    except ExecutionError as e:
        print(f"❌ Execution failed: {e}")
    except ValidationError as e:
        print(f"❌ Validation failed: {e}")
    finally:
        service.shutdown()

Asynchronous Execution

# Asynchronous execution (non-blocking)
async def async_execution_example():
    """Example of asynchronous tool execution."""

    service = FastSkillService(config)
    await service.initialize()

    try:
        # Asynchronous tool execution
        result = await service.tool_service.execute_tool(
            skill_id="text-extractor",
            tool_name="extract_text",
            parameters={"file_path": "document.pdf"}
        )

        print(f"✅ Execution result: {result['success']}")
        print(f"📄 Extracted text: {result['data'][:200]}...")

        # Process result asynchronously
        await process_extraction_result(result)

    except ExecutionError as e:
        print(f"❌ Execution failed: {e}")
        await handle_execution_error(e)
    except ValidationError as e:
        print(f"❌ Validation failed: {e}")
        await handle_validation_error(e)
    finally:
        await service.shutdown()

Batch Execution

Execute multiple tools in sequence or parallel:
async def batch_execution_example():
    """Execute multiple tools efficiently."""

    operations = [
        {
            "skill_id": "text-extractor",
            "tool_name": "extract_text",
            "parameters": {"file_path": "doc1.pdf"}
        },
        {
            "skill_id": "text-analyzer",
            "tool_name": "analyze_sentiment",
            "parameters": {"text": "placeholder"}  # Will be filled from previous result
        },
        {
            "skill_id": "data-processor",
            "tool_name": "generate_report",
            "parameters": {"analysis": "placeholder"}  # Will be filled from previous result
        }
    ]

    # Execute batch with dependency chaining
    results = await service.tool_service.execute_batch(
        operations,
        execution_mode="sequential",  # or "parallel"
        continue_on_error=False
    )

    # Process results
    for i, result in enumerate(results):
        operation = operations[i]
        print(f"Step {i+1}: {operation['tool_name']}")
        print(f"   Success: {result['success']}")
        if result['success']:
            print(f"   Result: {result['data']}")
        else:
            print(f"   Error: {result['error']}")

Error Handling

Execution Errors

Handle different types of execution errors:
async def robust_execution(skill_id: str, tool_name: str, parameters: dict):
    """Execute tool with comprehensive error handling."""

    try:
        # Validate before execution
        validation = await service.tool_service.validate_parameters(
            skill_id, tool_name, parameters
        )

        if not validation['valid']:
            return {
                "success": False,
                "error_type": "validation",
                "errors": validation['errors']
            }

        # Execute tool
        result = await service.tool_service.execute_tool(
            skill_id, tool_name, parameters
        )

        return {
            "success": True,
            "result": result
        }

    except ValidationError as e:
        # Parameter validation failed
        return {
            "success": False,
            "error_type": "validation",
            "error": str(e)
        }

    except ExecutionTimeoutError as e:
        # Execution timed out
        return {
            "success": False,
            "error_type": "timeout",
            "error": f"Execution timed out after {e.timeout_seconds}s"
        }

    except ResourceLimitError as e:
        # Resource limit exceeded
        return {
            "success": False,
            "error_type": "resource_limit",
            "error": f"Resource limit exceeded: {e.resource_type}"
        }

    except SecurityError as e:
        # Security violation
        return {
            "success": False,
            "error_type": "security",
            "error": f"Security violation: {e.violation}"
        }

    except Exception as e:
        # Unexpected error
        return {
            "success": False,
            "error_type": "unknown",
            "error": f"Unexpected error: {str(e)}"
        }

Retry Logic

Implement retry logic for transient failures:
async def execute_with_retry(skill_id: str, tool_name: str, parameters: dict,
                           max_retries: int = 3, backoff_seconds: float = 1.0):
    """Execute tool with retry logic for transient failures."""

    for attempt in range(max_retries + 1):
        try:
            result = await service.tool_service.execute_tool(
                skill_id, tool_name, parameters
            )

            if result['success']:
                return result
            else:
                # Check if error is retryable
                if not is_retryable_error(result.get('error_type', '')):
                    return result

        except (NetworkError, TimeoutError) as e:
            if attempt < max_retries:
                wait_time = backoff_seconds * (2 ** attempt)  # Exponential backoff
                print(f"⚠️ Attempt {attempt + 1} failed, retrying in {wait_time}s...")
                await asyncio.sleep(wait_time)
                continue
            else:
                return {
                    "success": False,
                    "error_type": "max_retries_exceeded",
                    "error": f"Failed after {max_retries} attempts"
                }

        except Exception as e:
            # Non-retryable error
            return {
                "success": False,
                "error_type": "non_retryable",
                "error": str(e)
            }

    # Should not reach here
    return {
        "success": False,
        "error_type": "unknown",
        "error": "Unexpected execution failure"
    }

def is_retryable_error(error_type: str) -> bool:
    """Check if an error type is retryable."""
    retryable_errors = {
        'network_error', 'timeout', 'temporary_unavailable',
        'rate_limit', 'server_error', 'database_connection'
    }

    return error_type in retryable_errors

Execution Monitoring

Performance Metrics

Monitor tool execution performance:
async def monitor_execution_performance():
    """Monitor and analyze tool execution performance."""

    # Get execution metrics
    metrics = await service.tool_service.get_execution_metrics()

    print("📊 Execution Performance:")
    print(f"   Total executions: {metrics['total_executions']}")
    print(f"   Success rate: {metrics['success_rate']:.1%}")
    print(f"   Average execution time: {metrics['avg_execution_time']:.2f}s")
    print(f"   Peak concurrent executions: {metrics['peak_concurrent']}")

    # Tool-specific metrics
    print("
🔧 Tool Performance:"    for tool_id, tool_metrics in metrics['tool_metrics'].items():
        print(f"   {tool_id}:")
        print(f"     Executions: {tool_metrics['execution_count']}")
        print(f"     Avg time: {tool_metrics['avg_execution_time']:.2f}s")
        print(f"     Success rate: {tool_metrics['success_rate']:.1%}")

    # Resource usage
    resources = metrics['resource_usage']
    print("
💾 Resource Usage:"    print(f"   Memory usage: {resources['memory_mb']:.1f} MB")
    print(f"   CPU usage: {resources['cpu_percent']:.1f}%")
    print(f"   Network I/O: {resources['network_bytes']} bytes")

Execution Logging

Configure detailed execution logging:
# Configure execution logging
config = ServiceConfig(
    execution=ExecutionConfig(
        enable_detailed_logging=True,
        log_execution_parameters=True,
        log_execution_results=True,
        log_performance_metrics=True
    ),

    enable_audit_logging=True,
    log_level="INFO"
)

# Execution logs will include:
# - Tool execution start/end times
# - Parameter values (if enabled)
# - Execution results and errors
# - Resource usage statistics
# - Security events and violations

Integration Patterns

With AI Agents

class ToolExecutionAgent:
    """AI agent that safely executes FastSkill tools."""

    def __init__(self, service):
        self.service = service
        self.execution_history = []

    async def execute_user_request(self, user_query: str):
        """Execute user request through FastSkill tools."""

        # Parse user query to determine tool and parameters
        tool_info = await self.parse_query(user_query)
        if not tool_info:
            return "I don't understand that request."

        skill_id = tool_info['skill_id']
        tool_name = tool_info['tool_name']
        parameters = tool_info['parameters']

        # Validate and execute
        execution_result = await self.safe_execute(
            skill_id, tool_name, parameters
        )

        # Log execution
        self.execution_history.append({
            "query": user_query,
            "tool": f"{skill_id}.{tool_name}",
            "success": execution_result['success'],
            "timestamp": datetime.now().isoformat()
        })

        if execution_result['success']:
            return f"✅ Task completed: {execution_result['result']}"
        else:
            return f"❌ Task failed: {execution_result['error']}"

    async def safe_execute(self, skill_id: str, tool_name: str, parameters: dict):
        """Safely execute tool with comprehensive error handling."""

        # Pre-execution validation
        validation = await self.service.tool_service.validate_parameters(
            skill_id, tool_name, parameters
        )

        if not validation['valid']:
            return {
                "success": False,
                "error_type": "validation",
                "error": "Parameter validation failed",
                "validation_errors": validation['errors']
            }

        # Execute with monitoring
        try:
            result = await self.service.tool_service.execute_tool(
                skill_id, tool_name, parameters
            )

            # Post-execution validation
            if not self.validate_result(result):
                return {
                    "success": False,
                    "error_type": "result_validation",
                    "error": "Result validation failed"
                }

            return {
                "success": True,
                "result": result['data']
            }

        except Exception as e:
            return {
                "success": False,
                "error_type": "execution",
                "error": str(e)
            }

With External Systems

# Integration with external APIs
async def execute_with_external_integration(skill_id: str, tool_name: str, parameters: dict):
    """Execute tool with external system integration."""

    # Pre-execution webhook
    await notify_external_system("execution_started", {
        "skill_id": skill_id,
        "tool_name": tool_name,
        "parameters": parameters
    })

    try:
        # Execute the tool
        result = await service.tool_service.execute_tool(
            skill_id, tool_name, parameters
        )

        # Success webhook
        await notify_external_system("execution_completed", {
            "skill_id": skill_id,
            "tool_name": tool_name,
            "success": result['success'],
            "execution_time": result.get('execution_time', 0)
        })

        return result

    except Exception as e:
        # Error webhook
        await notify_external_system("execution_failed", {
            "skill_id": skill_id,
            "tool_name": tool_name,
            "error": str(e)
        })
        raise

Best Practices

1

Validate before execution

Always validate parameters and tool availability before attempting execution.
2

Implement proper error handling

Handle all possible error types with appropriate fallback mechanisms.
3

Monitor resource usage

Monitor CPU, memory, and execution time to prevent resource exhaustion.
4

Use appropriate timeouts

Set reasonable timeouts based on expected execution time and user patience.
5

Log execution events

Maintain comprehensive logs for debugging and audit purposes.
Tool execution happens in a sandboxed environment. Always respect security boundaries and resource limits to prevent system compromise or resource exhaustion.