Overview
This section provides simple, copy-paste examples for common FastSkill use cases. Each example includes complete code that you can run immediately to get started with FastSkill.These examples use the Python SDK for simplicity, but the same concepts apply to the Rust SDK and REST API.
Text Processing
Extract Text from PDF
1
Create a PDF text extractor skill
skills/pdf-extractor.json
Copy
{
"id": "pdf-text-extractor",
"name": "PDF Text Extractor",
"description": "Extract clean text content from PDF documents",
"version": "1.0.0",
"author": "FastSkill Examples",
"tags": ["text", "pdf", "extraction", "documents"],
"capabilities": ["text_extraction", "pdf_processing", "document_analysis"],
"skill_file": "pdf_extractor.py",
"enabled": true,
"execution_timeout": 60,
"memory_limit_mb": 512,
"dependencies": ["pypdf2", "python-magic"],
"metadata": {
"category": "text-processing",
"supported_formats": ["pdf"],
"max_pages": 100,
"output_format": "plain_text"
}
}
skills/pdf_extractor.py
Copy
import asyncio
from typing import Dict, Any
from PyPDF2 import PdfReader
import os
async def extract_text_from_pdf(file_path: str, page_range: list = None, **kwargs) -> Dict[str, Any]:
"""Extract text content from a PDF document.
Args:
file_path: Path to the PDF file
page_range: Specific pages to extract (optional)
**kwargs: Additional extraction options
Returns:
Dictionary with extracted text and metadata
"""
# Validate input
if not os.path.exists(file_path):
raise FileNotFoundError(f"PDF file not found: {file_path}")
if not file_path.lower().endswith('.pdf'):
raise ValueError("File must be a PDF document")
# Extract text
try:
reader = PdfReader(file_path)
total_pages = len(reader.pages)
# Determine pages to extract
if page_range:
pages = [p-1 for p in page_range if 1 <= p <= total_pages] # Convert to 0-based
else:
pages = range(total_pages)
extracted_text = []
for page_num in pages:
try:
page = reader.pages[page_num]
text = page.extract_text()
extracted_text.append(f"--- Page {page_num + 1} ---\n{text}\n")
except Exception as e:
extracted_text.append(f"--- Page {page_num + 1} (Error) ---\nError extracting text: {e}\n")
full_text = "\n".join(extracted_text)
return {
"success": True,
"text": full_text,
"total_pages": total_pages,
"extracted_pages": len(pages),
"file_size_mb": os.path.getsize(file_path) / (1024 * 1024),
"extraction_time_seconds": 0 # Would measure in real implementation
}
except Exception as e:
return {
"success": False,
"error": f"Failed to extract text: {str(e)}",
"file_path": file_path
}
2
Register and test the skill
register_pdf_skill.py
Copy
import asyncio
from fastskill import FastSkillService
import json
async def setup_pdf_skill():
service = FastSkillService()
await service.initialize()
# Load and register skill
with open('skills/pdf-extractor.json') as f:
skill_definition = json.load(f)
skill_id = await service.register_skill(skill_definition)
print(f"✅ PDF extractor skill registered: {skill_id}")
# Test the skill
test_pdf = "sample-document.pdf" # Replace with actual PDF path
if os.path.exists(test_pdf):
result = await service.tool_service.execute_tool(
skill_id,
"extract_text_from_pdf",
{"file_path": test_pdf}
)
if result["success"]:
print("✅ Text extraction successful!")
print(f"📄 Extracted {result['extracted_pages']} pages")
print(f"📝 Text preview: {result['text'][:200]}...")
else:
print(f"❌ Extraction failed: {result['error']}")
else:
print(f"⚠️ Test PDF not found: {test_pdf}")
await service.shutdown()
# Run the setup
import os
asyncio.run(setup_pdf_skill())
Text Analysis
1
Create a sentiment analyzer skill
skills/sentiment-analyzer.json
Copy
{
"id": "sentiment-analyzer",
"name": "Sentiment Analyzer",
"description": "Analyze text sentiment and extract emotional insights",
"version": "1.0.0",
"author": "FastSkill Examples",
"tags": ["text", "nlp", "sentiment", "analysis", "emotions"],
"capabilities": ["sentiment_analysis", "text_analysis", "emotion_detection", "opinion_mining"],
"skill_file": "sentiment_analyzer.py",
"enabled": true,
"execution_timeout": 30,
"memory_limit_mb": 256,
"dependencies": ["textblob", "nltk"],
"metadata": {
"category": "text-analysis",
"supported_languages": ["en", "es", "fr", "de"],
"analysis_types": ["polarity", "subjectivity", "emotions"]
}
}
skills/sentiment_analyzer.py
Copy
import asyncio
from typing import Dict, Any, List
from textblob import TextBlob
import re
async def analyze_sentiment(text: str, language: str = "en", **kwargs) -> Dict[str, Any]:
"""Analyze sentiment and emotions in text content.
Args:
text: Text content to analyze
language: Language code for analysis
**kwargs: Additional analysis options
Returns:
Dictionary with sentiment analysis results
"""
# Clean text
cleaned_text = clean_text(text)
# Perform sentiment analysis
blob = TextBlob(cleaned_text)
# Calculate sentiment scores
polarity = blob.sentiment.polarity # -1 (negative) to 1 (positive)
subjectivity = blob.sentiment.subjectivity # 0 (objective) to 1 (subjective)
# Categorize sentiment
if polarity > 0.1:
sentiment_category = "positive"
elif polarity < -0.1:
sentiment_category = "negative"
else:
sentiment_category = "neutral"
# Extract key phrases
key_phrases = extract_key_phrases(cleaned_text)
# Calculate confidence
confidence = abs(polarity) + (1 - subjectivity) * 0.5
return {
"success": True,
"sentiment": {
"polarity": polarity,
"subjectivity": subjectivity,
"category": sentiment_category,
"confidence": min(confidence, 1.0)
},
"text_stats": {
"word_count": len(cleaned_text.split()),
"sentence_count": len(blob.sentences),
"character_count": len(cleaned_text)
},
"key_phrases": key_phrases,
"language": language,
"analysis_timestamp": asyncio.get_event_loop().time()
}
def clean_text(text: str) -> str:
"""Clean text for analysis."""
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Remove URLs
text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
# Remove email addresses
text = re.sub(r'\S+@\S+', '', text)
return text.strip()
def extract_key_phrases(text: str) -> List[str]:
"""Extract key phrases from text."""
blob = TextBlob(text)
# Get noun phrases
noun_phrases = list(blob.noun_phrases)
# Get significant words (adjectives and nouns)
words = []
for word, tag in blob.tags:
if tag.startswith('JJ') or tag.startswith('NN'): # Adjectives or nouns
words.append(word.lower())
return list(set(noun_phrases + words[:10])) # Combine and limit
2
Test sentiment analysis
test_sentiment.py
Copy
import asyncio
from fastskill import FastSkillService
async def test_sentiment_analysis():
service = FastSkillService()
await service.initialize()
# Test texts with different sentiments
test_texts = [
"I love this product! It's amazing and works perfectly.",
"This is terrible. I hate it and it doesn't work at all.",
"It's okay, nothing special but does the job.",
"The customer service was excellent and very helpful.",
"The documentation is confusing and hard to follow."
]
# Register sentiment analyzer
skill_id = await service.register_skill({
"id": "sentiment-analyzer",
"name": "Sentiment Analyzer",
"description": "Analyze text sentiment and emotions",
"version": "1.0.0",
"tags": ["nlp", "sentiment"],
"capabilities": ["sentiment_analysis"],
"skill_file": "skills/sentiment_analyzer.py",
"enabled": True
})
# Analyze each text
for text in test_texts:
result = await service.tool_service.execute_tool(
skill_id,
"analyze_sentiment",
{"text": text}
)
if result["success"]:
sentiment = result["sentiment"]
print(f"📝 Text: '{text[:50]}...'")
print(f" Sentiment: {sentiment['category']} (polarity: {sentiment['polarity']:.2f})")
print(f" Confidence: {sentiment['confidence']:.2f}")
print(f" Key phrases: {', '.join(result['key_phrases'][:3])}")
else:
print(f"❌ Analysis failed: {result['error']}")
print()
await service.shutdown()
asyncio.run(test_sentiment_analysis())
File Management
File Organizer
1
Create a file organizer skill
skills/file-organizer.json
Copy
{
"id": "file-organizer",
"name": "Smart File Organizer",
"description": "Automatically organize files by type, size, date, or content",
"version": "1.0.0",
"author": "FastSkill Examples",
"tags": ["files", "organization", "management", "automation"],
"capabilities": ["file_management", "directory_operations", "content_analysis", "auto_sorting"],
"skill_file": "file_organizer.py",
"enabled": true,
"execution_timeout": 120,
"memory_limit_mb": 512,
"dependencies": ["python-magic", "pillow"],
"metadata": {
"category": "file-management",
"supported_operations": ["by_type", "by_size", "by_date", "by_content"],
"max_files": 10000,
"preserve_structure": true
}
}
skills/file_organizer.py
Copy
import asyncio
from typing import Dict, Any, List
import os
import shutil
from pathlib import Path
import magic
from datetime import datetime
async def organize_files_by_type(source_dir: str, target_dir: str, **kwargs) -> Dict[str, Any]:
"""Organize files in a directory by their type.
Args:
source_dir: Source directory to organize
target_dir: Target directory for organized files
**kwargs: Additional organization options
Returns:
Dictionary with organization results
"""
# Validate directories
source_path = Path(source_dir)
target_path = Path(target_dir)
if not source_path.exists():
raise FileNotFoundError(f"Source directory not found: {source_dir}")
target_path.mkdir(parents=True, exist_ok=True)
# File type mappings
file_types = {
'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'],
'documents': ['.pdf', '.docx', '.doc', '.txt', '.rtf', '.odt'],
'spreadsheets': ['.xlsx', '.xls', '.csv', '.ods'],
'presentations': ['.pptx', '.ppt', '.odp'],
'videos': ['.mp4', '.avi', '.mkv', '.mov', '.wmv'],
'audio': ['.mp3', '.wav', '.flac', '.aac', '.ogg'],
'archives': ['.zip', '.rar', '.7z', '.tar', '.gz'],
'code': ['.py', '.js', '.html', '.css', '.java', '.cpp', '.rs'],
'other': []
}
# Get file type detector
mime_detector = magic.Magic(mime=True)
# Find all files
files_found = []
for file_path in source_path.rglob('*'):
if file_path.is_file():
files_found.append(file_path)
# Organize files
organized_count = 0
errors = []
for file_path in files_found:
try:
# Determine file type
mime_type = mime_detector.from_file(str(file_path))
file_extension = file_path.suffix.lower()
# Find appropriate category
category = 'other'
for cat, extensions in file_types.items():
if file_extension in extensions or any(ext in mime_type for ext in cat):
category = cat
break
# Create category directory
category_dir = target_path / category
category_dir.mkdir(exist_ok=True)
# Copy file to category directory
target_file = category_dir / file_path.name
shutil.copy2(file_path, target_file)
organized_count += 1
except Exception as e:
errors.append(f"Error organizing {file_path}: {e}")
return {
"success": True,
"organized_files": organized_count,
"total_files": len(files_found),
"categories_created": len([d for d in target_path.iterdir() if d.is_dir()]),
"errors": errors,
"source_directory": str(source_path),
"target_directory": str(target_path)
}
2
Test file organization
test_file_organizer.py
Copy
import asyncio
from fastskill import FastSkillService
import tempfile
import os
from pathlib import Path
async def test_file_organizer():
service = FastSkillService()
await service.initialize()
# Create test files
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create source directory
source_dir = temp_path / "source"
source_dir.mkdir()
# Create test files
test_files = [
("document.pdf", "This is a PDF document"),
("spreadsheet.xlsx", "Excel spreadsheet data"),
("image.jpg", "JPEG image content"),
("script.py", "# Python script"),
("video.mp4", "Video content"),
("archive.zip", "Compressed archive")
]
for filename, content in test_files:
file_path = source_dir / filename
with open(file_path, 'w') as f:
f.write(content)
# Register file organizer skill
skill_id = await service.register_skill({
"id": "file-organizer",
"name": "File Organizer",
"description": "Organize files by type",
"version": "1.0.0",
"capabilities": ["file_management"],
"skill_file": "skills/file_organizer.py",
"enabled": True
})
# Organize files
target_dir = temp_path / "organized"
result = await service.tool_service.execute_tool(
skill_id,
"organize_files_by_type",
{
"source_dir": str(source_dir),
"target_dir": str(target_dir)
}
)
print("📁 File Organization Results:")
print(f" Files organized: {result['organized_files']}")
print(f" Total files: {result['total_files']}")
print(f" Categories created: {result['categories_created']}")
# Show organized structure
if result["success"]:
print("
📂 Organized structure:" for category_dir in sorted(target_dir.iterdir()):
if category_dir.is_dir():
file_count = len(list(category_dir.iterdir()))
print(f" {category_dir.name}/: {file_count} files")
if result["errors"]:
print(f"\n⚠️ Errors: {result['errors']}")
await service.shutdown()
asyncio.run(test_file_organizer())
Data Analysis
CSV Data Analyzer
1
Create a CSV analyzer skill
skills/csv-analyzer.json
Copy
{
"id": "csv-analyzer",
"name": "CSV Data Analyzer",
"description": "Analyze CSV data with statistical summaries and insights",
"version": "1.0.0",
"author": "FastSkill Examples",
"tags": ["data", "csv", "analysis", "statistics", "visualization"],
"capabilities": ["data_analysis", "csv_processing", "statistical_analysis", "data_visualization"],
"skill_file": "csv_analyzer.py",
"enabled": true,
"execution_timeout": 60,
"memory_limit_mb": 512,
"dependencies": ["pandas", "numpy", "matplotlib"],
"metadata": {
"category": "data-analysis",
"supported_formats": ["csv", "tsv"],
"max_rows": 100000,
"max_columns": 100
}
}
skills/csv_analyzer.py
Copy
import asyncio
from typing import Dict, Any, List
import pandas as pd
import numpy as np
import os
async def analyze_csv_data(file_path: str, analysis_type: str = "summary", **kwargs) -> Dict[str, Any]:
"""Analyze CSV data and provide statistical insights.
Args:
file_path: Path to CSV file
analysis_type: Type of analysis (summary, correlation, trends, anomalies)
**kwargs: Additional analysis options
Returns:
Dictionary with analysis results
"""
# Validate input
if not os.path.exists(file_path):
raise FileNotFoundError(f"CSV file not found: {file_path}")
# Load CSV data
try:
df = pd.read_csv(file_path)
# Basic data info
basic_info = {
"rows": len(df),
"columns": len(df.columns),
"file_size_mb": os.path.getsize(file_path) / (1024 * 1024),
"memory_usage_mb": df.memory_usage(deep=True).sum() / (1024 * 1024)
}
analysis_results = {
"basic_info": basic_info,
"column_info": {}
}
# Column analysis
for column in df.columns:
col_info = analyze_column(df[column])
analysis_results["column_info"][column] = col_info
# Specific analysis types
if analysis_type == "summary" or analysis_type == "complete":
analysis_results["summary"] = generate_summary(df)
if analysis_type == "correlation" or analysis_type == "complete":
analysis_results["correlations"] = generate_correlations(df)
if analysis_type == "trends" or analysis_type == "complete":
analysis_results["trends"] = analyze_trends(df)
if analysis_type == "anomalies" or analysis_type == "complete":
analysis_results["anomalies"] = detect_anomalies(df)
return {
"success": True,
"analysis_type": analysis_type,
"data": analysis_results,
"analysis_timestamp": asyncio.get_event_loop().time()
}
except Exception as e:
return {
"success": False,
"error": f"Analysis failed: {str(e)}",
"file_path": file_path
}
def analyze_column(series: pd.Series) -> Dict[str, Any]:
"""Analyze a single column."""
col_type = str(series.dtype)
# Handle different data types
if pd.api.types.is_numeric_dtype(series):
stats = {
"type": "numeric",
"mean": float(series.mean()),
"median": float(series.median()),
"std": float(series.std()),
"min": float(series.min()),
"max": float(series.max()),
"null_count": int(series.isnull().sum()),
"unique_count": int(series.nunique())
}
elif pd.api.types.is_string_dtype(series) or pd.api.types.is_object_dtype(series):
stats = {
"type": "text",
"unique_count": int(series.nunique()),
"null_count": int(series.isnull().sum()),
"most_common": series.mode().tolist() if not series.mode().empty else [],
"avg_length": float(series.dropna().str.len().mean()) if hasattr(series, 'str') else 0
}
else:
stats = {
"type": "other",
"unique_count": int(series.nunique()),
"null_count": int(series.isnull().sum())
}
return stats
def generate_summary(df: pd.DataFrame) -> Dict[str, Any]:
"""Generate summary statistics."""
return {
"numeric_columns": len(df.select_dtypes(include=[np.number]).columns),
"text_columns": len(df.select_dtypes(include=['object']).columns),
"total_nulls": int(df.isnull().sum().sum()),
"duplicate_rows": int(df.duplicated().sum()),
"memory_usage_mb": float(df.memory_usage(deep=True).sum() / (1024 * 1024))
}
def generate_correlations(df: pd.DataFrame) -> List[Dict[str, Any]]:
"""Generate correlation analysis."""
numeric_df = df.select_dtypes(include=[np.number])
if len(numeric_df.columns) < 2:
return []
correlations = numeric_df.corr()
# Extract significant correlations
significant_correlations = []
for i in range(len(correlations.columns)):
for j in range(i+1, len(correlations.columns)):
col1 = correlations.columns[i]
col2 = correlations.columns[j]
corr_value = correlations.iloc[i, j]
if abs(corr_value) > 0.3: # Significant correlation threshold
significant_correlations.append({
"column1": col1,
"column2": col2,
"correlation": float(corr_value),
"strength": "strong" if abs(corr_value) > 0.7 else "moderate"
})
return sorted(significant_correlations, key=lambda x: abs(x["correlation"]), reverse=True)
def analyze_trends(df: pd.DataFrame) -> Dict[str, Any]:
"""Analyze data trends."""
# This is a simplified implementation
return {
"trend_analysis": "Simplified implementation",
"note": "Full trend analysis would require time-series data and more complex algorithms"
}
def detect_anomalies(df: pd.DataFrame) -> Dict[str, Any]:
"""Detect data anomalies."""
anomalies = {}
for column in df.select_dtypes(include=[np.number]).columns:
series = df[column].dropna()
if len(series) > 0:
# Simple anomaly detection using IQR
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = series[(series < lower_bound) | (series > upper_bound)]
anomalies[column] = {
"outlier_count": len(outliers),
"outlier_percentage": float(len(outliers) / len(series)) * 100,
"bounds": {"lower": float(lower_bound), "upper": float(upper_bound)}
}
return anomalies
2
Test CSV analysis
test_csv_analysis.py
Copy
import asyncio
from fastskill import FastSkillService
import tempfile
import os
async def test_csv_analysis():
service = FastSkillService()
await service.initialize()
# Create sample CSV data
csv_content = """name,age,salary,department,years_experience
John Doe,30,75000,Engineering,5
Jane Smith,28,65000,Marketing,3
Bob Johnson,35,90000,Engineering,8
Alice Brown,32,70000,Sales,6
Charlie Wilson,29,55000,Marketing,2
Diana Davis,31,80000,Engineering,7
"""
# Save to temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
f.write(csv_content)
csv_file = f.name
try:
# Register CSV analyzer
skill_id = await service.register_skill({
"id": "csv-analyzer",
"name": "CSV Analyzer",
"description": "Analyze CSV data",
"version": "1.0.0",
"capabilities": ["data_analysis"],
"skill_file": "skills/csv_analyzer.py",
"enabled": True
})
# Analyze the CSV
result = await service.tool_service.execute_tool(
skill_id,
"analyze_csv_data",
{
"file_path": csv_file,
"analysis_type": "complete"
}
)
if result["success"]:
data = result["data"]
basic_info = data["basic_info"]
print("📊 CSV Analysis Results:")
print(f" Rows: {basic_info['rows']}")
print(f" Columns: {basic_info['columns']}")
print(f" File size: {basic_info['file_size_mb']:.2f} MB")
print("
📋 Column Analysis:" for col, info in data["column_info"].items():
print(f" {col} ({info['type']}): {info}")
if "correlations" in data and data["correlations"]:
print("
🔗 Significant Correlations:" for corr in data["correlations"][:3]:
print(f" {corr['column1']} ↔ {corr['column2']}: {corr['correlation']:.3f} ({corr['strength']})")
if "anomalies" in data and data["anomalies"]:
print("
⚠️ Data Anomalies:" for col, anomaly in data["anomalies"].items():
if anomaly["outlier_count"] > 0:
print(f" {col}: {anomaly['outlier_count']} outliers ({anomaly['outlier_percentage']:.1f}%)")
else:
print(f"❌ Analysis failed: {result['error']}")
finally:
# Cleanup
os.unlink(csv_file)
await service.shutdown()
asyncio.run(test_csv_analysis())
Integration Examples
With AI Agents
Copy
# Example: Integration with a simple AI agent
import asyncio
from fastskill import FastSkillService
class SimpleAIAgent:
"""Simple AI agent that uses FastSkill for tool execution."""
def __init__(self):
self.service = None
async def initialize(self):
"""Initialize the agent with FastSkill."""
self.service = FastSkillService()
await self.service.initialize()
# Register some basic skills
await self._register_basic_skills()
async def _register_basic_skills(self):
"""Register basic skills for the agent."""
skills = [
{
"id": "text-processor",
"name": "Text Processor",
"description": "Process and analyze text",
"version": "1.0.0",
"capabilities": ["text_processing", "text_analysis"]
},
{
"id": "file-manager",
"name": "File Manager",
"description": "Manage files and directories",
"version": "1.0.0",
"capabilities": ["file_management"]
}
]
for skill in skills:
await self.service.register_skill(skill)
async def process_request(self, user_query: str):
"""Process a user request using FastSkill."""
# Discover relevant skills
relevant_skills = await self.service.discover_skills(user_query)
if not relevant_skills:
return "I don't have the capability to handle that request yet."
# Use the most relevant skill
best_skill = relevant_skills[0]
skill_id = best_skill["id"]
# Extract parameters from query (simplified)
parameters = self._extract_parameters(user_query)
# Execute the skill
try:
result = await self.service.tool_service.execute_tool(
skill_id,
"process", # Assuming all skills have a "process" tool
parameters
)
if result["success"]:
return f"✅ Task completed: {result['data']}"
else:
return f"❌ Task failed: {result['error']}"
except Exception as e:
return f"❌ Error: {str(e)}"
def _extract_parameters(self, query: str) -> dict:
"""Simple parameter extraction (in real implementation, use NLP)."""
# This is a simplified implementation
return {"query": query}
async def shutdown(self):
"""Clean up resources."""
if self.service:
await self.service.shutdown()
# Usage example
async def demo_agent():
agent = SimpleAIAgent()
await agent.initialize()
# Test queries
test_queries = [
"analyze this text for sentiment",
"organize my files by type",
"extract data from CSV file"
]
for query in test_queries:
print(f"\n🤖 User: {query}")
response = await agent.process_request(query)
print(f"🤖 Agent: {response}")
await agent.shutdown()
# Run the demo
asyncio.run(demo_agent())
Best Practices
1
Start simple
Begin with basic examples and gradually add complexity as you understand the patterns.
2
Test thoroughly
Test your skills with various inputs, edge cases, and error conditions before deploying.
3
Handle errors gracefully
Implement proper error handling and provide meaningful error messages for debugging.
4
Document your skills
Include comprehensive documentation and examples for each skill you create.
5
Monitor performance
Monitor execution times and resource usage to identify optimization opportunities.
These examples provide a solid foundation for understanding FastSkill’s capabilities. Use them as starting points and adapt them for your specific use cases.