Overview
Tool execution is the process of running skill-defined operations with proper parameter validation, security sandboxing, and error handling. FastSkill provides a robust execution environment for safe and reliable tool operation.Tool execution happens within a secure sandbox with configurable resource limits, network controls, and comprehensive error handling.
Execution Environment
Security Sandbox
FastSkill executes tools in an isolated environment:Copy
# Configure execution sandbox
from fastskill import ExecutionConfig
config = ServiceConfig(
execution=ExecutionConfig(
sandbox_level="strict", # Maximum isolation
max_memory_mb=512, # Memory limit
max_cpu_percent=50, # CPU limit
default_timeout=30, # Execution timeout
enable_networking=False, # No network access
allowed_paths=["./data", "./temp"], # Limited file access
enable_subprocess=False, # No subprocess execution
enable_file_writing=True # Controlled file writing
)
)
Resource Management
Monitor and control resource usage:Copy
# Resource monitoring during execution
async def execute_with_monitoring(skill_id: str, tool_name: str, parameters: dict):
"""Execute tool with resource monitoring."""
# Start monitoring
monitor = ResourceMonitor()
monitor.start()
try:
# Execute the tool
result = await service.tool_service.execute_tool(
skill_id, tool_name, parameters
)
# Get resource usage
usage = monitor.stop()
print(f"📊 Resource Usage:")
print(f" CPU: {usage['cpu_percent']:.1f}%")
print(f" Memory: {usage['memory_mb']:.1f} MB")
print(f" Execution time: {usage['execution_time']:.2f}s")
print(f" Network I/O: {usage['network_bytes']} bytes")
return result
except ResourceLimitError as e:
print(f"⚠️ Resource limit exceeded: {e}")
# Handle resource limit violations
raise
except Exception as e:
print(f"❌ Execution failed: {e}")
raise
Parameter Handling
Parameter Validation
Validate parameters before execution:Copy
async def validate_and_execute(skill_id: str, tool_name: str, parameters: dict):
"""Validate parameters and execute tool safely."""
# Get tool specification
tool_spec = await service.tool_service.get_tool_details(
f"{skill_id}.{tool_name}"
)
# Validate parameters
validation_result = await service.tool_service.validate_parameters(
skill_id, tool_name, parameters
)
if not validation_result['valid']:
print("❌ Parameter validation failed:")
for error in validation_result['errors']:
print(f" {error['parameter']}: {error['message']}")
return None
# Execute with validated parameters
result = await service.tool_service.execute_tool(
skill_id, tool_name, parameters
)
return result
Parameter Type Conversion
Handle parameter type conversion automatically:Copy
def convert_parameters(parameters: dict, tool_spec: dict) -> dict:
"""Convert parameters to correct types."""
converted = {}
for param_name, param_spec in tool_spec['parameters'].items():
if param_name in parameters:
value = parameters[param_name]
expected_type = param_spec['type']
# Convert type
if expected_type == 'integer' and isinstance(value, str):
converted[param_name] = int(value)
elif expected_type == 'number' and isinstance(value, str):
converted[param_name] = float(value)
elif expected_type == 'boolean' and isinstance(value, str):
converted[param_name] = value.lower() in ['true', '1', 'yes']
elif expected_type == 'array' and isinstance(value, str):
converted[param_name] = value.split(',')
else:
converted[param_name] = value
return converted
# Example usage
parameters = {
"file_path": "/path/to/file.pdf",
"page_range": "1,2,3", # String
"preserve_layout": "true", # String
"timeout": "30" # String
}
tool_spec = {
"parameters": {
"page_range": {"type": "array"},
"preserve_layout": {"type": "boolean"},
"timeout": {"type": "integer"}
}
}
converted = convert_parameters(parameters, tool_spec)
print(f"Converted parameters: {converted}")
Execution Modes
Synchronous Execution
Copy
# Synchronous execution (blocking)
def sync_execution_example():
"""Example of synchronous tool execution."""
service = FastSkillService(config)
service.initialize()
try:
# Synchronous tool execution
result = service.tool_service.execute_tool_sync(
skill_id="text-extractor",
tool_name="extract_text",
parameters={"file_path": "document.pdf"}
)
print(f"✅ Execution result: {result['success']}")
print(f"📄 Extracted text: {result['data'][:200]}...")
except ExecutionError as e:
print(f"❌ Execution failed: {e}")
except ValidationError as e:
print(f"❌ Validation failed: {e}")
finally:
service.shutdown()
Asynchronous Execution
Copy
# Asynchronous execution (non-blocking)
async def async_execution_example():
"""Example of asynchronous tool execution."""
service = FastSkillService(config)
await service.initialize()
try:
# Asynchronous tool execution
result = await service.tool_service.execute_tool(
skill_id="text-extractor",
tool_name="extract_text",
parameters={"file_path": "document.pdf"}
)
print(f"✅ Execution result: {result['success']}")
print(f"📄 Extracted text: {result['data'][:200]}...")
# Process result asynchronously
await process_extraction_result(result)
except ExecutionError as e:
print(f"❌ Execution failed: {e}")
await handle_execution_error(e)
except ValidationError as e:
print(f"❌ Validation failed: {e}")
await handle_validation_error(e)
finally:
await service.shutdown()
Batch Execution
Execute multiple tools in sequence or parallel:Copy
async def batch_execution_example():
"""Execute multiple tools efficiently."""
operations = [
{
"skill_id": "text-extractor",
"tool_name": "extract_text",
"parameters": {"file_path": "doc1.pdf"}
},
{
"skill_id": "text-analyzer",
"tool_name": "analyze_sentiment",
"parameters": {"text": "placeholder"} # Will be filled from previous result
},
{
"skill_id": "data-processor",
"tool_name": "generate_report",
"parameters": {"analysis": "placeholder"} # Will be filled from previous result
}
]
# Execute batch with dependency chaining
results = await service.tool_service.execute_batch(
operations,
execution_mode="sequential", # or "parallel"
continue_on_error=False
)
# Process results
for i, result in enumerate(results):
operation = operations[i]
print(f"Step {i+1}: {operation['tool_name']}")
print(f" Success: {result['success']}")
if result['success']:
print(f" Result: {result['data']}")
else:
print(f" Error: {result['error']}")
Error Handling
Execution Errors
Handle different types of execution errors:Copy
async def robust_execution(skill_id: str, tool_name: str, parameters: dict):
"""Execute tool with comprehensive error handling."""
try:
# Validate before execution
validation = await service.tool_service.validate_parameters(
skill_id, tool_name, parameters
)
if not validation['valid']:
return {
"success": False,
"error_type": "validation",
"errors": validation['errors']
}
# Execute tool
result = await service.tool_service.execute_tool(
skill_id, tool_name, parameters
)
return {
"success": True,
"result": result
}
except ValidationError as e:
# Parameter validation failed
return {
"success": False,
"error_type": "validation",
"error": str(e)
}
except ExecutionTimeoutError as e:
# Execution timed out
return {
"success": False,
"error_type": "timeout",
"error": f"Execution timed out after {e.timeout_seconds}s"
}
except ResourceLimitError as e:
# Resource limit exceeded
return {
"success": False,
"error_type": "resource_limit",
"error": f"Resource limit exceeded: {e.resource_type}"
}
except SecurityError as e:
# Security violation
return {
"success": False,
"error_type": "security",
"error": f"Security violation: {e.violation}"
}
except Exception as e:
# Unexpected error
return {
"success": False,
"error_type": "unknown",
"error": f"Unexpected error: {str(e)}"
}
Retry Logic
Implement retry logic for transient failures:Copy
async def execute_with_retry(skill_id: str, tool_name: str, parameters: dict,
max_retries: int = 3, backoff_seconds: float = 1.0):
"""Execute tool with retry logic for transient failures."""
for attempt in range(max_retries + 1):
try:
result = await service.tool_service.execute_tool(
skill_id, tool_name, parameters
)
if result['success']:
return result
else:
# Check if error is retryable
if not is_retryable_error(result.get('error_type', '')):
return result
except (NetworkError, TimeoutError) as e:
if attempt < max_retries:
wait_time = backoff_seconds * (2 ** attempt) # Exponential backoff
print(f"⚠️ Attempt {attempt + 1} failed, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
continue
else:
return {
"success": False,
"error_type": "max_retries_exceeded",
"error": f"Failed after {max_retries} attempts"
}
except Exception as e:
# Non-retryable error
return {
"success": False,
"error_type": "non_retryable",
"error": str(e)
}
# Should not reach here
return {
"success": False,
"error_type": "unknown",
"error": "Unexpected execution failure"
}
def is_retryable_error(error_type: str) -> bool:
"""Check if an error type is retryable."""
retryable_errors = {
'network_error', 'timeout', 'temporary_unavailable',
'rate_limit', 'server_error', 'database_connection'
}
return error_type in retryable_errors
Execution Monitoring
Performance Metrics
Monitor tool execution performance:Copy
async def monitor_execution_performance():
"""Monitor and analyze tool execution performance."""
# Get execution metrics
metrics = await service.tool_service.get_execution_metrics()
print("📊 Execution Performance:")
print(f" Total executions: {metrics['total_executions']}")
print(f" Success rate: {metrics['success_rate']:.1%}")
print(f" Average execution time: {metrics['avg_execution_time']:.2f}s")
print(f" Peak concurrent executions: {metrics['peak_concurrent']}")
# Tool-specific metrics
print("
🔧 Tool Performance:" for tool_id, tool_metrics in metrics['tool_metrics'].items():
print(f" {tool_id}:")
print(f" Executions: {tool_metrics['execution_count']}")
print(f" Avg time: {tool_metrics['avg_execution_time']:.2f}s")
print(f" Success rate: {tool_metrics['success_rate']:.1%}")
# Resource usage
resources = metrics['resource_usage']
print("
💾 Resource Usage:" print(f" Memory usage: {resources['memory_mb']:.1f} MB")
print(f" CPU usage: {resources['cpu_percent']:.1f}%")
print(f" Network I/O: {resources['network_bytes']} bytes")
Execution Logging
Configure detailed execution logging:Copy
# Configure execution logging
config = ServiceConfig(
execution=ExecutionConfig(
enable_detailed_logging=True,
log_execution_parameters=True,
log_execution_results=True,
log_performance_metrics=True
),
enable_audit_logging=True,
log_level="INFO"
)
# Execution logs will include:
# - Tool execution start/end times
# - Parameter values (if enabled)
# - Execution results and errors
# - Resource usage statistics
# - Security events and violations
Integration Patterns
With AI Agents
Copy
class ToolExecutionAgent:
"""AI agent that safely executes FastSkill tools."""
def __init__(self, service):
self.service = service
self.execution_history = []
async def execute_user_request(self, user_query: str):
"""Execute user request through FastSkill tools."""
# Parse user query to determine tool and parameters
tool_info = await self.parse_query(user_query)
if not tool_info:
return "I don't understand that request."
skill_id = tool_info['skill_id']
tool_name = tool_info['tool_name']
parameters = tool_info['parameters']
# Validate and execute
execution_result = await self.safe_execute(
skill_id, tool_name, parameters
)
# Log execution
self.execution_history.append({
"query": user_query,
"tool": f"{skill_id}.{tool_name}",
"success": execution_result['success'],
"timestamp": datetime.now().isoformat()
})
if execution_result['success']:
return f"✅ Task completed: {execution_result['result']}"
else:
return f"❌ Task failed: {execution_result['error']}"
async def safe_execute(self, skill_id: str, tool_name: str, parameters: dict):
"""Safely execute tool with comprehensive error handling."""
# Pre-execution validation
validation = await self.service.tool_service.validate_parameters(
skill_id, tool_name, parameters
)
if not validation['valid']:
return {
"success": False,
"error_type": "validation",
"error": "Parameter validation failed",
"validation_errors": validation['errors']
}
# Execute with monitoring
try:
result = await self.service.tool_service.execute_tool(
skill_id, tool_name, parameters
)
# Post-execution validation
if not self.validate_result(result):
return {
"success": False,
"error_type": "result_validation",
"error": "Result validation failed"
}
return {
"success": True,
"result": result['data']
}
except Exception as e:
return {
"success": False,
"error_type": "execution",
"error": str(e)
}
With External Systems
Copy
# Integration with external APIs
async def execute_with_external_integration(skill_id: str, tool_name: str, parameters: dict):
"""Execute tool with external system integration."""
# Pre-execution webhook
await notify_external_system("execution_started", {
"skill_id": skill_id,
"tool_name": tool_name,
"parameters": parameters
})
try:
# Execute the tool
result = await service.tool_service.execute_tool(
skill_id, tool_name, parameters
)
# Success webhook
await notify_external_system("execution_completed", {
"skill_id": skill_id,
"tool_name": tool_name,
"success": result['success'],
"execution_time": result.get('execution_time', 0)
})
return result
except Exception as e:
# Error webhook
await notify_external_system("execution_failed", {
"skill_id": skill_id,
"tool_name": tool_name,
"error": str(e)
})
raise
Best Practices
1
Validate before execution
Always validate parameters and tool availability before attempting execution.
2
Implement proper error handling
Handle all possible error types with appropriate fallback mechanisms.
3
Monitor resource usage
Monitor CPU, memory, and execution time to prevent resource exhaustion.
4
Use appropriate timeouts
Set reasonable timeouts based on expected execution time and user patience.
5
Log execution events
Maintain comprehensive logs for debugging and audit purposes.
Tool execution happens in a sandboxed environment. Always respect security boundaries and resource limits to prevent system compromise or resource exhaustion.