moai-alfred-workflow
Enterprise multi-agent workflow orchestration specialist. Master workflow design, agent coordination, task delegation, and process automation with Context7 MCP integration and comprehensive monitoring. Build scalable, intelligent workflow systems with fault tolerance and performance optimization.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install ajbcoding-claude-skill-eval-moai-alfred-workflow
Repository
Skill path: skills/moai-alfred-workflow
Enterprise multi-agent workflow orchestration specialist. Master workflow design, agent coordination, task delegation, and process automation with Context7 MCP integration and comprehensive monitoring. Build scalable, intelligent workflow systems with fault tolerance and performance optimization.
Open repositoryBest for
Primary workflow: Design Product.
Technical facets: Full Stack, Designer, Integration.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: AJBcoding.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install moai-alfred-workflow into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/AJBcoding/claude-skill-eval before adding moai-alfred-workflow to shared team environments
- Use moai-alfred-workflow for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: moai-alfred-workflow
version: 4.0.0
status: production
description: |
Enterprise multi-agent workflow orchestration specialist. Master workflow
design, agent coordination, task delegation, and process automation with Context7
MCP integration and comprehensive monitoring. Build scalable, intelligent
workflow systems with fault tolerance and performance optimization.
allowed-tools: ["Read", "Write", "Edit", "Bash", "Glob", "WebFetch", "WebSearch"]
tags: ["workflow", "automation", "agents", "orchestration", "context7", "mcp", "multi-agent"]
---
# Alfred Workflow Orchestration
## Level 1: Quick Reference
### Core Capabilities
- **Multi-Agent Systems**: Coordinated agent workflows and delegation
- **Process Automation**: End-to-end workflow automation
- **Task Orchestration**: Complex task scheduling and management
- **Context7 Integration**: 13,157+ code examples and documentation lookup
- **Monitoring**: Comprehensive workflow performance tracking
### Quick Setup
```python
# Basic workflow setup
from alfred_workflow import WorkflowEngine, Agent
engine = WorkflowEngine()
spec_agent = Agent("spec-builder", domain="requirements")
impl_agent = Agent("tdd-implementer", domain="development")
test_agent = Agent("quality-gate", domain="testing")
workflow = engine.create_workflow("feature_development")
workflow.add_stage("specification", spec_agent)
workflow.add_stage("implementation", impl_agent, depends_on=["specification"])
workflow.add_stage("testing", test_agent, depends_on=["implementation"])
result = engine.execute(workflow, input_data={"feature": "user auth"})
```
```python
# Context7 integration
from alfred_workflow import Context7Integration
context7 = Context7Integration()
examples = context7.search_code_examples(
query="react authentication", language="javascript", framework="react"
)
best_practices = context7.get_best_practices(
topic="database optimization", database="postgresql"
)
```
### Essential Patterns
| Pattern | Use Case | Benefit |
|---------|----------|---------|
| Sequential | Linear tasks | Predictable flow |
| Parallel | Independent tasks | Faster completion |
| Conditional | Decision-based | Adaptive workflows |
| Error Recovery | Fault tolerance | Reliable execution |
## Level 2: Core Implementation
### Architecture Overview
**WorkflowEngine**: Central orchestrator managing agents and workflows
**Agent System**: Specialized agents for different domains (spec-builder, tdd-implementer, quality-gate)
**Task Model**: Structured tasks with dependencies, priorities, and retry logic
**Template System**: Reusable workflow patterns (feature development, bug fix)
### Core Classes
```python
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
from typing import Dict, List, Any, Optional
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class AgentStatus(Enum):
IDLE = "idle"
BUSY = "busy"
ERROR = "error"
@dataclass
class Task:
id: str
name: str
description: str
agent_type: str
input_data: Dict[str, Any] = field(default_factory=dict)
dependencies: List[str] = field(default_factory=list)
status: TaskStatus = TaskStatus.PENDING
retry_count: int = 0
max_retries: int = 3
result: Optional[Dict[str, Any]] = None
@dataclass
class Agent:
id: str
name: str
agent_type: str
capabilities: List[str]
status: AgentStatus = AgentStatus.IDLE
current_task: Optional[Task] = None
class WorkflowEngine:
def __init__(self, max_concurrent_tasks: int = 5):
self.agents: Dict[str, Agent] = {}
self.workflows: Dict[str, 'Workflow'] = {}
self.max_concurrent_tasks = max_concurrent_tasks
def register_agent(self, agent: Agent) -> None:
self.agents[agent.id] = agent
def create_workflow(self, name: str, description: str = "") -> 'Workflow':
workflow = Workflow(name=name, description=description, engine=self)
self.workflows[name] = workflow
return workflow
async def execute_workflow(self, workflow: 'Workflow') -> Dict[str, Any]:
results = {}
for task in workflow.get_execution_order():
await self._wait_for_dependencies(task, results)
result = await self.execute_task(task)
results[task.id] = result
return results
@dataclass
class Workflow:
name: str
description: str
engine: WorkflowEngine
tasks: List[Task] = field(default_factory=list)
status: str = "created"
created_at: datetime = field(default_factory=datetime.now)
def add_stage(self, stage_name: str, agent_type: str,
input_data: Dict[str, Any] = None,
depends_on: List[str] = None) -> Task:
task = Task(
id=f"{stage_name}_{len(self.tasks)}",
name=stage_name,
description=f"Workflow stage: {stage_name}",
agent_type=agent_type,
input_data=input_data or {},
dependencies=depends_on or []
)
self.tasks.append(task)
return task
```
### Context7 Integration
```python
class Context7Integration:
def __init__(self, mcp_servers: List[str] = None):
self.mcp_servers = mcp_servers or []
self.cache = {}
self.cache_ttl = 3600
async def search_code_examples(self, query: str, language: str = None,
framework: str = None, limit: int = 10) -> List[Dict]:
cache_key = f"code_examples_{query}_{language}_{framework}_{limit}"
# Check cache first
if cache_key in self.cache:
cached = self.cache[cache_key]
if time.time() - cached['timestamp'] < self.cache_ttl:
return cached['data']
# Execute Context7 search
results = await self._context7_search(query, language, framework, limit)
# Cache results
self.cache[cache_key] = {'data': results, 'timestamp': time.time()}
return results
async def get_best_practices(self, topic: str, domain: str = None) -> Dict:
search_query = f"best practices {topic}"
if domain:
search_query += f" {domain}"
results = await self._context7_search(search_query, limit=5)
return results[0] if results else {}
```
### Workflow Templates
```python
from abc import ABC, abstractmethod
class WorkflowTemplate(ABC):
def __init__(self, name: str, description: str):
self.name = name
self.description = description
@abstractmethod
def create_workflow(self, engine: WorkflowEngine, config: Dict) -> Workflow:
pass
class FeatureDevelopmentTemplate(WorkflowTemplate):
def __init__(self):
super().__init__("feature_development", "End-to-end feature development")
def create_workflow(self, engine: WorkflowEngine, config: Dict) -> Workflow:
workflow = engine.create_workflow(
name=f"feature_{config.get('feature_name', 'unknown')}",
description=config.get('feature_description', '')
)
# Specification stage
spec_task = workflow.add_stage("specification", "spec-builder", {
"feature_description": config.get('feature_description', ''),
"requirements": config.get('requirements', [])
})
# Implementation stage
impl_task = workflow.add_stage("implementation", "tdd-implementer", {
"spec_id": spec_task.id,
"technology_stack": config.get('technology_stack', [])
}, depends_on=[spec_task.id])
# Testing stage
workflow.add_stage("testing", "quality-gate", {
"implementation_id": impl_task.id,
"test_types": config.get('test_types', ['unit', 'integration'])
}, depends_on=[impl_task.id])
return workflow
```
## Level 3: Advanced Features
### Workflow Scheduling
```python
from enum import Enum
import uuid
import asyncio
class WorkflowPriority(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class WorkflowScheduler:
def __init__(self, workflow_engine: WorkflowEngine):
self.workflow_engine = workflow_engine
self.workflow_queue = asyncio.Queue()
self.template_manager = WorkflowTemplateManager()
async def submit_workflow(self, template_name: str, config: Dict,
priority: WorkflowPriority = WorkflowPriority.MEDIUM,
scheduled_time: Optional[datetime] = None) -> str:
workflow_id = str(uuid.uuid4())
workflow = self.template_manager.create_workflow_from_template(
template_name, self.workflow_engine, config
)
if not workflow:
raise ValueError(f"Unknown template: {template_name}")
workflow_item = {
'workflow_id': workflow_id,
'workflow': workflow,
'priority': priority,
'scheduled_time': scheduled_time or datetime.now()
}
await self.workflow_queue.put((-priority.value, workflow_item))
return workflow_id
```
### Error Handling & Recovery
```python
class WorkflowErrorHandler:
def __init__(self, workflow_engine: WorkflowEngine):
self.workflow_engine = workflow_engine
async def handle_task_failure(self, task: Task, error: Exception) -> bool:
if task.retry_count < task.max_retries:
return await self._retry_with_backoff(task, error)
if self._is_retriable_error(error):
return await self._retry_with_backoff(task, error)
else:
return await self._require_manual_intervention(task, error)
async def _retry_with_backoff(self, task: Task, error: Exception) -> bool:
delay = 2 ** task.retry_count
task.retry_count += 1
await asyncio.sleep(delay)
try:
await self.workflow_engine.execute_task(task)
return True
except Exception as e:
return await self.handle_task_failure(task, e)
def _is_retriable_error(self, error: Exception) -> bool:
retriable_errors = ['TimeoutError', 'ConnectionError', 'TemporaryFailure']
return type(error).__name__ in retriable_errors
```
### Performance Monitoring
```python
class WorkflowMetrics:
def __init__(self):
self.metrics = {
'workflows_completed': 0,
'workflows_failed': 0,
'total_execution_time': 0.0,
'task_completion_times': []
}
def record_workflow_completion(self, workflow_id: str, execution_time: float, status: str):
if status == 'completed':
self.metrics['workflows_completed'] += 1
else:
self.metrics['workflows_failed'] += 1
self.metrics['total_execution_time'] += execution_time
def get_performance_summary(self) -> Dict[str, Any]:
completed = self.metrics['workflows_completed']
failed = self.metrics['workflows_failed']
total = completed + failed
if total == 0:
return {'error': 'No workflows executed'}
avg_time = self.metrics['total_execution_time'] / total if total > 0 else 0
success_rate = (completed / total) * 100 if total > 0 else 0
return {
'total_workflows': total,
'success_rate': f"{success_rate:.2f}%",
'average_execution_time': f"{avg_time:.2f}s",
'workflows_completed': completed,
'workflows_failed': failed
}
```
## Level 4: Reference & Integration
### When to Use
**Use for:**
- Multi-agent workflows requiring coordination
- Automated CI/CD pipelines with intelligent decision making
- Enterprise process automation with error handling
- Tasks requiring Context7 integration for best practices
- Workflows needing comprehensive monitoring
**Avoid for:**
- Simple single-agent tasks (use specific domain skills)
- Basic automation without coordination needs
- Quick prototyping without enterprise requirements
### Common Usage Patterns
```python
# Feature Development Workflow
workflow_id = await scheduler.submit_workflow(
"feature_development",
{
"feature_name": "user_authentication",
"feature_description": "Implement secure user login",
"requirements": ["JWT auth", "Password hashing"],
"acceptance_criteria": ["Users can login securely"]
}
)
# Bug Fix Workflow
bug_workflow_id = await scheduler.submit_workflow(
"bug_fix",
{
"bug_id": "BUG-001",
"bug_description": "Login fails with invalid credentials",
"error_logs": ["AuthException at line 42"],
"reproduction_steps": ["Enter invalid password"]
},
priority=WorkflowPriority.HIGH
)
```
### Security & Compliance
**TRUST Principles Applied**:
- **Test First**: All workflows include validation stages and quality gates
- **Readable**: Clear workflow structure with comprehensive logging
- **Unified**: Consistent patterns across all workflow templates
- **Secured**: Built-in input validation and security checks
- **Traceable**: Complete audit trail with workflow execution history
**Enterprise Security Features**:
- **Input Validation**: Automated security scanning of workflow inputs
- **Access Control**: Role-based access to workflow operations
- **Audit Logging**: Complete execution history with security events
- **Data Encryption**: Sensitive data protection in transit and at rest
### Related Skills
- `moai-alfred-agent-guide` - Agent selection and delegation patterns
- `moai-alfred-spec-authoring` - SPEC creation workflows
- `moai-essentials-debug` - Error handling and troubleshooting
- `moai-foundation-trust` - Security and compliance principles
- `moai-domain-backend` - Backend-specific workflow patterns
- `moai-domain-testing` - Testing workflow integration
---
**Enterprise v4.0 Compliance**: Progressive disclosure with comprehensive error handling, security controls, and monitoring.
**Last Updated**: 2025-11-13
**Dependencies**: Context7 MCP integration, Alfred agent system
**See Also**: [examples.md](./examples.md) for detailed usage examples
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### examples.md
```markdown
# Alfred Workflow Examples
## Quick Start Examples
### Basic Multi-Agent Workflow
```python
from alfred_workflow import WorkflowEngine, Agent, Task
# Create workflow engine
engine = WorkflowEngine()
# Define agents
spec_agent = Agent("spec-builder", domain="requirements")
impl_agent = Agent("tdd-implementer", domain="development")
test_agent = Agent("quality-gate", domain="testing")
# Create workflow
workflow = engine.create_workflow("feature_development")
# Add stages
workflow.add_stage("specification", spec_agent)
workflow.add_stage("implementation", impl_agent, depends_on=["specification"])
workflow.add_stage("testing", test_agent, depends_on=["implementation"])
# Execute workflow
result = engine.execute(workflow, input_data={"feature": "user authentication"})
```
### Feature Development Template
```python
from alfred_workflow.templates import FeatureDevelopmentTemplate
# Create template
template = FeatureDevelopmentTemplate()
# Configure workflow
config = {
"feature_name": "user_authentication",
"feature_description": "Implement secure user authentication",
"requirements": [
"User registration with email verification",
"Secure password hashing",
"JWT token generation",
"Session management"
],
"acceptance_criteria": [
"Users can register with email and password",
"Email verification required for activation",
"Passwords are securely hashed using bcrypt",
"JWT tokens expire after 24 hours",
"Session persists across browser restarts"
],
"technology_stack": ["Python", "FastAPI", "JWT", "bcrypt"],
"test_types": ["unit", "integration", "security", "e2e"],
"coverage_threshold": 85
}
# Submit workflow
workflow_id = await workflow_engine.submit_workflow("feature_development", config)
```
### Bug Fix Template
```python
from alfred_workflow.templates import BugFixTemplate
# Create bug fix workflow
template = BugFixTemplate()
config = {
"bug_id": "BUG-2024-001",
"bug_description": "User authentication fails on password reset",
"error_logs": [
"ERROR: Authentication failed: invalid token format",
"WARNING: Password reset token expired"
],
"reproduction_steps": [
"1. Request password reset",
"2. Click reset link in email",
"3. Enter new password",
"4. Submit form"
],
"priority": "high",
"environment": "production"
}
workflow_id = await workflow_engine.submit_workflow("bug_fix", config)
```
## Advanced Examples
### Custom Workflow Template
```python
from alfred_workflow import WorkflowTemplate, Workflow, WorkflowEngine
from dataclasses import dataclass
from typing import Dict, Any, Optional
class CodeReviewTemplate(WorkflowTemplate):
"""Custom template for code review workflow"""
def __init__(self):
super().__init__(
name="code_review",
description="Automated code review workflow"
)
def create_workflow(self, engine: WorkflowEngine, config: Dict[str, Any]) -> Workflow:
workflow = engine.create_workflow(
name=f"code_review_{config.get('pr_id', 'unknown')}",
description=f"Review PR: {config.get('pr_title', 'unknown')}"
)
# Static analysis stage
static_analysis_task = workflow.add_stage(
stage_name="static_analysis",
agent_type="quality-gate",
input_data={
"pr_url": config.get('pr_url'),
"analysis_tools": ["pylint", "mypy", "security-scan"],
"thresholds": {
"complexity": 10,
"coverage": 80,
"security": 0
}
}
)
# Performance review stage
performance_task = workflow.add_stage(
stage_name="performance_review",
agent_type="performance-engineer",
input_data={
"pr_url": config.get('pr_url'),
"benchmarks": ["load_test", "memory_usage", "response_time"],
"regression_threshold": 5 # 5% performance degradation allowed
},
depends_on=[static_analysis_task.id]
)
# Security review stage
security_task = workflow.add_stage(
stage_name="security_review",
agent_type="security-expert",
input_data={
"pr_url": config.get('pr_url'),
"security_checks": ["owasp", "dependency_scan", "secret_scan"],
"critical_issues_only": config.get('critical_only', False)
},
depends_on=[performance_task.id]
)
return workflow
# Register custom template
workflow_engine.register_template(CodeReviewTemplate())
# Use custom template
config = {
"pr_id": "1234",
"pr_title": "Add user authentication API",
"pr_url": "https://github.com/repo/pull/1234",
"critical_only": True
}
workflow_id = await workflow_engine.submit_workflow("code_review", config)
```
### Context7 Integration Examples
```python
from alfred_workflow.integrations import Context7Integration
# Initialize Context7 integration
context7 = Context7Integration(
mcp_servers=["context7", "github", "filesystem"]
)
# Search for code examples
code_examples = await context7.search_code_examples(
query="react authentication with JWT",
language="typescript",
framework="react",
limit=10
)
# Get best practices
best_practices = await context7.get_best_practices(
topic="python async programming",
language="python"
)
# Documentation lookup
docs = await context7.lookup_documentation(
library="fastapi",
topic="authentication"
)
```
### Monitoring and Metrics
```python
# Get workflow engine metrics
metrics = workflow_engine.get_metrics()
print(f"Total workflows: {metrics['total_workflows']}")
print(f"Active workflows: {metrics['active_workflows']}")
print(f"Queue size: {metrics['queue_size']}")
# Get workflow performance data
performance_data = workflow_engine.get_performance_analysis()
print(f"Average execution time: {performance_data['avg_execution_time']}")
print(f"Success rate: {performance_data['success_rate']}")
# Monitor specific workflow
workflow_status = workflow_engine.get_workflow_status(workflow_id)
print(f"Workflow status: {workflow_status['status']}")
print(f"Current stage: {workflow_status['current_stage']}")
print(f"Progress: {workflow_status['progress']}%")
```
### Error Handling and Recovery
```python
from alfred_workflow.exceptions import WorkflowError, AgentError
import asyncio
async def robust_workflow_execution():
try:
# Submit workflow with retry logic
workflow_id = await workflow_engine.submit_workflow(
"feature_development",
config,
priority=WorkflowPriority.HIGH
)
# Wait for completion with timeout
result = await asyncio.wait_for(
workflow_engine.wait_for_completion(workflow_id),
timeout=3600 # 1 hour timeout
)
return result
except WorkflowError as e:
logger.error(f"Workflow failed: {e}")
# Get error details
error_info = workflow_engine.get_workflow_error(workflow_id)
# Decide on recovery strategy
if error_info['retryable']:
logger.info("Retrying workflow...")
return await robust_workflow_execution()
else:
# Manual intervention required
await alert_developer(error_info)
raise
except asyncio.TimeoutError:
logger.error("Workflow timed out")
await workflow_engine.cancel_workflow(workflow_id)
raise
async def alert_developer(error_info):
"""Send alert to development team"""
from alfred_workflow.notifications import NotificationManager
notifications = NotificationManager()
await notifications.send_slack_alert(
channel="#dev-alerts",
message=f"🚨 Workflow failed: {error_info['workflow_id']}",
details=error_info
)
await notifications.send_email(
to=["[email protected]"],
subject="Workflow Failure Alert",
body=f"Workflow {error_info['workflow_id']} requires attention"
)
```
### Scheduled Workflows
```python
from datetime import datetime, timedelta
import asyncio
async def setup_scheduled_workflows():
"""Setup recurring scheduled workflows"""
# Daily security scan
tomorrow = datetime.now() + timedelta(days=1)
await workflow_engine.submit_workflow(
"security_scan",
{"scan_type": "full", "environment": "production"},
scheduled_time=tomorrow.replace(hour=2, minute=0) # 2 AM
)
# Weekly performance report
next_week = datetime.now() + timedelta(weeks=1)
await workflow_engine.submit_workflow(
"performance_report",
{"report_type": "weekly", "include_trends": True},
scheduled_time=next_week.replace(hour=8, minute=0) # 8 AM Monday
)
# Monthly deployment checklist
next_month = datetime.now() + timedelta(days=30)
await workflow_engine.submit_workflow(
"deployment_checklist",
{"environment": "production", "compliance": True},
scheduled_time=next_month.replace(hour=10, minute=0)
)
# Run scheduled workflows
asyncio.create_task(setup_scheduled_workflows())
asyncio.create_task(workflow_engine.start_scheduler())
```
```