Back to skills
SkillHub ClubResearch & OpsFull StackSecurityTesting

workflow-orchestrator

Project workflow system - cost tracking, parallel execution, security gates, agent orchestration. Use when: start day, begin session, status check, new feature, build, implement, end day, wrap up, debug, investigate, research, evaluate.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
6
Hot score
82
Updated
March 20, 2026
Overall rating
C1.1
Composite score
1.1
Best-practice grade
B75.6

Install command

npx @skill-hub/cli install scientiacapital-skills-workflow-orchestrator-skill

Repository

scientiacapital/skills

Skill path: active/workflow-orchestrator-skill

Project workflow system - cost tracking, parallel execution, security gates, agent orchestration. Use when: start day, begin session, status check, new feature, build, implement, end day, wrap up, debug, investigate, research, evaluate.

Open repository

Best for

Primary workflow: Research & Ops.

Technical facets: Full Stack, Security, Testing.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: scientiacapital.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install workflow-orchestrator into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/scientiacapital/skills before adding workflow-orchestrator to shared team environments
  • Use workflow-orchestrator for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: "workflow-orchestrator"
description: "Project workflow system - cost tracking, parallel execution, security gates, agent orchestration. Use when: start day, begin session, status check, new feature, build, implement, end day, wrap up, debug, investigate, research, evaluate."
---

<objective>
Universal project workflow system providing cost tracking, parallel execution via git worktrees, security gates, and intelligent agent orchestration. Manages complete development lifecycle from session start to end-of-day with mandatory security sweeps and context preservation.
</objective>

<quick_start>
**Start session:**
```bash
pwd && git status && git log --oneline -5
cat PROJECT_CONTEXT.md 2>/dev/null
```

**End session:**
1. Run security sweep: `gitleaks detect --source .`
2. Update `PROJECT_CONTEXT.md` with completed/in-progress
3. Log costs to `costs/daily-YYYY-MM-DD.json`

**Feature development:** Plan → DB/Schema → Parallel implementation → Security gate → Ship
</quick_start>

<success_criteria>
Workflow is successful when:
- Context scan completed at session start (pwd, git status, PROJECT_CONTEXT.md)
- Security sweep passes before any commits (gitleaks, secrets check, audit)
- Cost tracking updated (daily.json, mtd.json)
- PROJECT_CONTEXT.md updated at session end
- Worktrees cleaned up after merge
- All security gates passed before shipping
</success_criteria>

<triggers>

- **Session Management:** "start day", "begin session", "what's the status", "end day", "wrap up", "done for today"
- **Feature Development:** "new feature", "build", "implement"  
- **Debugging:** "debug", "investigate", "why is this broken"
- **Research:** "research", "evaluate", "should we use"

---

## START DAY

### Context Scan (Mandatory)
```bash
# Detect project
pwd
git status
git log --oneline -5

# Load context
cat PROJECT_CONTEXT.md 2>/dev/null || echo "No context file"
cat CLAUDE.md 2>/dev/null
cat TASK.md 2>/dev/null
cat PLANNING.md 2>/dev/null
```

### Worktree Status
```bash
cat ~/.claude/worktree-registry.json 2>/dev/null | jq '.worktrees[] | select(.project == "'$(basename $(pwd))'")'
git worktree list
```

### Cost Status
```bash
cat costs/daily-$(date +%Y-%m-%d).json 2>/dev/null || echo "No cost tracking today"
cat costs/mtd.json 2>/dev/null | jq '.total'
```

### Output Format
```markdown
## Session Start: [PROJECT_NAME]

### Completed (Last Session)
- [x] Task 1
- [x] Task 2

### In Progress
| Task | Branch/Worktree | Status |
|------|-----------------|--------|
| API endpoint | feature/api @ 8100 | 70% |

### Blockers
- [ ] Waiting on X

### Today's Priority Queue
1. [AGENT: research-skill] Evaluate framework options
2. [AGENT: langgraph-agents-skill] Build orchestration
3. [AGENT: debug-like-expert] Fix flaky test

### Cost Context
- Today: $0.00 | MTD: $12.34 | Budget: $100
- Avg cost/task: $0.45
```

**Deep dive:** See `reference/start-day-protocol.md`

---

## RESEARCH PHASE

**Trigger:** Before ANY feature development involving new frameworks, APIs, or architectural decisions.

### Scan Existing Solutions
```bash
# Check MCP cookbook first
ls /Users/tmkipper/Desktop/tk_projects/mcp-server-cookbook/ 2>/dev/null

# Check your repos
find ~/tk_projects -name "*.md" -exec grep -l "[search_term]" {} \; 2>/dev/null | head -20
```

### Evaluate Approach
Use `/research-skill` checklist:
- Framework selection criteria
- LLM selection (default: DeepSeek V3 for bulk, Claude Sonnet for reasoning)
- Infrastructure (Supabase/Neon/RunPod)

### Cost Projection
```python
# Estimate before building
estimated_costs = {
    "inference": tokens_estimate * model_cost_per_1k / 1000,
    "compute": hours_estimate * runpod_hourly,
    "storage": gb_estimate * supabase_monthly / 30
}
if sum(estimated_costs.values()) > threshold:
    flag_for_review()
```

### Output
Create `RESEARCH.md` → `FINDINGS.md` with:
- Substantive one-liner summary
- Confidence score (1-10)
- Dependencies list
- Open questions
- **GO/NO-GO recommendation**

### Gate
⛔ **Human checkpoint required before proceeding**

**Deep dive:** See `reference/research-workflow.md`

---

## FEATURE DEVELOPMENT

### Phase 0: PLAN
```markdown
1. Create BRIEF.md with scope
2. Map to agents (use workflow-enforcer 70+ catalog)
3. Identify parallelization opportunities
4. Create TodoWrite todos
5. Cost estimate
```

### Phase 1: SETUP + DB
```bash
# Schema design
/database-design:schema-design

# Migrations
/supabase-sql-skill or /database-migrations:sql-migrations
```
⛔ **Gate: Schema review before Phase 2**

### Phase 2: PARALLEL IMPLEMENTATION

#### Port Allocation
```bash
# Reserve ports upfront (8100-8199 pool)
PORTS=$(cat ~/.claude/worktree-registry.json | jq '[.worktrees[].ports[]] | max // 8098' | xargs -I{} expr {} + 2)
echo "Next available: $PORTS, $(expr $PORTS + 1)"
```

#### Spawn Worktrees
```bash
# Worktree A: Backend
git worktree add -b feature/api-backend ~/tmp/worktrees/$(basename $(pwd))/api-backend
# Assign ports 8100, 8101

# Worktree B: Frontend  
git worktree add -b feature/ui ~/tmp/worktrees/$(basename $(pwd))/ui
# Assign ports 8102, 8103

# Worktree C: Tests
git worktree add -b feature/tests ~/tmp/worktrees/$(basename $(pwd))/tests
# Assign ports 8104, 8105
```

#### Monitor & Merge
```bash
# Check status
git worktree list

# After completion, merge
git checkout main
git merge feature/api-backend
git worktree remove ~/tmp/worktrees/my-project/api-backend
```

### Phase 3: SECURITY + INTEGRATION

Run parallel scans:
```bash
# SAST
semgrep --config auto . 

# Secrets
gitleaks detect --source .

# Dependencies
npm audit --audit-level=critical || pip-audit

# Tests
pytest --cov=src || npm test -- --coverage
```

⛔ **Gate: ALL must pass**
```python
gate = (
    sast_clean AND 
    secrets_found == 0 AND 
    critical_vulns == 0 AND 
    test_coverage >= 80
)
```

### Phase 4: SHIP
```bash
# Final review
git diff main...HEAD

# Update docs
# - TASK.md (mark complete)
# - PLANNING.md (update status)  
# - CLAUDE.md (add learnings)

# Commit
git add .
git commit -m "feat: [description]"
git push

# Log cost
echo '{"feature": "X", "cost": 1.23, "date": "'$(date -Iseconds)'"}' >> costs/by-feature.jsonl
```

**Deep dive:** See `reference/feature-development.md`

---

## DEBUG MODE

**Trigger:** When standard troubleshooting fails or issue is complex.

### Context Scan
```bash
# Detect project type
cat package.json 2>/dev/null && echo "Node.js project"
cat pyproject.toml 2>/dev/null && echo "Python project"
cat Cargo.toml 2>/dev/null && echo "Rust project"

# Load domain expertise
ls ~/.claude/skills/expertise/ 2>/dev/null
```

### Evidence Gathering (Mandatory)
Document before ANY fix attempt:
```markdown
## Issue
[Exact error message]

## Reproduction
1. Step 1
2. Step 2
3. Error occurs

## Expected vs Actual
- Expected: X
- Actual: Y

## Environment
- OS: 
- Runtime version:
- Dependencies:
```

### Hypothesis Formation
List 3+ hypotheses with evidence:
```markdown
### Hypotheses
1. **[Most likely]** Database connection timeout
   - Evidence: Error mentions "connection refused"
   - Test: Check DB status
   
2. **[Possible]** Race condition in async code
   - Evidence: Intermittent failure
   - Test: Add logging around suspect area
   
3. **[Less likely]** Dependency version mismatch
   - Evidence: Works on other machine
   - Test: Compare package-lock.json
```

### Critical Rules
- ❌ NO DRIVE-BY FIXES - if you can't explain WHY, don't commit
- ❌ NO GUESSING - verify everything
- ✅ Use all tools: MCP servers, web search, extended thinking
- ✅ Think out loud
- ✅ One variable at a time

**Deep dive:** See `reference/debug-methodology.md`

---

## END DAY

### Security Sweep (Mandatory - Blocks Commits)
```bash
# Parallel scans
gitleaks detect --source . --verbose
git log -p | grep -E "(password|secret|api.?key|token)" || echo "Clean"
npm audit --audit-level=critical 2>/dev/null || pip-audit 2>/dev/null || echo "No package manager"
grep -r "API_KEY\|SECRET" --include="*.env*" . && echo "⚠️ Check env files"
```

⛔ **Gate: ALL must pass before any commits**

### Context Preservation
Update `PROJECT_CONTEXT.md`:
```markdown
## Last Updated: [DATE]

### Completed This Session
- [x] Built API endpoint
- [x] Fixed auth bug

### In Progress
- [ ] Frontend integration (70%)

### Blockers
- Waiting on design review

### Decisions Made
- Chose Supabase over Firebase (cost: $0 vs $25/mo)
- Using DeepSeek V3 for embeddings (90% cheaper)

### Tomorrow's Priorities
1. Complete frontend integration
2. Write tests
3. Deploy to staging
```

### Cost Tracking
```bash
# Log today's costs
cat >> costs/daily-$(date +%Y-%m-%d).json << EOF
{
  "inference": {"claude": 0.45, "deepseek": 0.02},
  "compute": {"runpod": 0.00},
  "total": 0.47
}
EOF

# Update MTD
jq '.total += 0.47' costs/mtd.json > tmp && mv tmp costs/mtd.json
```

### Worktree Cleanup
```bash
# Check for orphans
git worktree list --porcelain | grep -E "^worktree" 

# Merge completed work
for wt in $(git worktree list | grep -v "bare\|main" | awk '{print $1}'); do
  # Check if PR merged, then cleanup
done

# Remove merged worktrees
git worktree prune
```

**Deep dive:** See `reference/end-day-protocol.md`

---

## COST TRACKING

### Model Costs Reference
```python
MODEL_COSTS = {
    # Per 1K tokens
    "claude-sonnet": 0.003,      # Complex reasoning
    "deepseek-v3": 0.00014,      # 95% cheaper - bulk processing
    "qwen-72b": 0.0002,          # 93% cheaper - alternatives
    "voyage-embed": 0.0001,      # Embeddings
    "ollama-local": 0.0,         # Free - local dev
}

BUDGETS = {
    "daily": 5.00,
    "monthly": 100.00,
    "alert_threshold": 0.8,  # Alert at 80%
}
```

### Cost-Optimized Routing
| Task Type | Model | Why |
|-----------|-------|-----|
| Complex reasoning | Claude Sonnet | Quality critical |
| Bulk processing | DeepSeek V3 | 90% savings |
| Code generation | Claude Sonnet | Accuracy matters |
| Embeddings | Voyage | Cost + quality balance |
| Local dev/testing | Ollama | Free |

**Deep dive:** See `reference/cost-tracking.md`

---

## ROLLBACK / RECOVERY

### When to Rollback
- Tests failing after "fix"
- Security scan finds new issues
- Performance degradation
- Unexpected behavior

### Recovery Workflow
```bash
# 1. Stash current work
git stash

# 2. Find last known good
git log --oneline -20

# 3. Selective rollback
git checkout [commit] -- [specific_file]

# OR full revert
git revert [commit]

# 4. Verify
pytest  # or npm test

# 5. Investigate root cause using debug-like-expert
```

**Deep dive:** See `reference/rollback-recovery.md`

---

## AGENT QUICK REFERENCE

| Need | Agent/Skill |
|------|-------------|
| Market/tech research | `/research-skill` |
| Project planning | `/planning-prompts` |
| Multi-agent systems | `/langgraph-agents-skill` |
| Complex debugging | `/debug-like-expert` |
| Parallel development | `/worktree-manager` |
| Session context | `/project-context-skill` |
| CRM integration | `/crm-integration-skill` |
| Data analysis | `/data-analysis-skill` |
| Voice AI | `/voice-ai-skill` |
| Trading signals | `/trading-signals-skill` |
| SQL migrations | `/supabase-sql-skill` |
| GPU deployment | `/runpod-deployment-skill` |
| Sales/revenue | `/sales-revenue-skill` |
| Fast inference | `/groq-inference-skill` |

**Deep dive:** See `reference/agent-routing.md` for complete 70+ agent catalog

---

## PROJECT STRUCTURE

```
project/
├── CLAUDE.md              # Project rules + learnings
├── PLANNING.md            # Roadmap + phases
├── TASK.md                # Current sprint
├── Backlog.md             # Future work
├── PROJECT_CONTEXT.md     # Auto-generated session context
├── .taskmaster/
│   └── docs/
│       └── prd.txt        # Product requirements
├── .prompts/              # Meta-prompts
│   ├── research/
│   ├── plan/
│   ├── do/
│   └── refine/
├── costs/                 # Cost tracking
│   ├── daily-YYYY-MM-DD.json
│   ├── by-feature.jsonl
│   └── mtd.json
└── src/
```

---

## GTME PERSPECTIVE

This workflow system demonstrates core Go-To-Market Engineer capabilities:

1. **Systematization** - Converting ad-hoc processes into repeatable, documented workflows
2. **Cost Awareness** - Unit economics thinking (cost-per-task, cost-per-lead mindset)
3. **Parallelization** - Orchestrating complex multi-agent systems efficiently
4. **Documentation Discipline** - Audit trails that prove capability
5. **Tool Integration** - Connecting sales, engineering, and ops tooling

**Portfolio Value:** This skill itself is a GTME portfolio piece showing technical depth + process thinking + cost optimization - directly relevant for roles combining GTM strategy with technical implementation.

---

## Referenced Files

> The following files are referenced in this skill and included for context.

### reference/start-day-protocol.md

```markdown
# Start Day Protocol

Deep dive into session initialization and context loading procedures.

## Context Scan Components

### 1. Project Detection
```bash
# Get current directory
PROJ_NAME=$(basename $(pwd))

# Detect project type
if [ -f "package.json" ]; then
    echo "Node.js project detected"
    PROJ_TYPE="node"
elif [ -f "pyproject.toml" ] || [ -f "setup.py" ]; then
    echo "Python project detected"
    PROJ_TYPE="python"
elif [ -f "Cargo.toml" ]; then
    echo "Rust project detected"
    PROJ_TYPE="rust"
elif [ -f "go.mod" ]; then
    echo "Go project detected"
    PROJ_TYPE="go"
fi
```

### 2. Git State Analysis
```bash
# Branch info
CURRENT_BRANCH=$(git branch --show-current)
DEFAULT_BRANCH=$(git symbolic-ref refs/remotes/origin/HEAD | sed 's@^refs/remotes/origin/@@')

# Uncommitted changes
UNSTAGED=$(git diff --stat)
STAGED=$(git diff --cached --stat)

# Recent activity
LAST_COMMIT=$(git log -1 --pretty=format:"%h %s (%cr)")
COMMITS_AHEAD=$(git rev-list --count HEAD ^origin/$CURRENT_BRANCH)
```

### 3. Context File Loading Priority
1. **PROJECT_CONTEXT.md** - Session-specific context
2. **CLAUDE.md** - Project rules and patterns
3. **TASK.md** - Current sprint tasks
4. **PLANNING.md** - Roadmap and phases
5. **Backlog.md** - Future work
6. **.taskmaster/docs/prd.txt** - Product requirements

### 4. Worktree Analysis
```bash
# Load registry
REGISTRY=~/.claude/worktree-registry.json
if [ -f "$REGISTRY" ]; then
    # Active worktrees for project
    jq --arg proj "$PROJ_NAME" '.worktrees[] | select(.project == $proj)' "$REGISTRY"
    
    # Port usage
    USED_PORTS=$(jq -r '.worktrees[].ports[]' "$REGISTRY" | sort -n)
    NEXT_PORT=$(echo "$USED_PORTS" | tail -1 | xargs -I{} expr {} + 2)
fi
```

### 5. Cost Context Loading
```bash
# Today's costs
TODAY=$(date +%Y-%m-%d)
TODAY_COST=$(cat costs/daily-$TODAY.json 2>/dev/null | jq '.total' || echo "0")

# Month-to-date
MTD=$(cat costs/mtd.json 2>/dev/null | jq '.total' || echo "0")
BUDGET=$(cat costs/mtd.json 2>/dev/null | jq '.budget' || echo "100")
REMAINING=$(echo "$BUDGET - $MTD" | bc)
PERCENT_USED=$(echo "scale=0; $MTD * 100 / $BUDGET" | bc)

# Average cost per task
AVG_COST=$(cat costs/by-feature.jsonl 2>/dev/null | jq -s 'add/length' || echo "0")
```

## Session Initialization Workflow

### Step 1: Automatic Context Detection
```bash
# Run on every session start
/usr/local/bin/workflow-orchestrator-start-day.sh
```

### Step 2: Priority Queue Generation
```python
def generate_priority_queue():
    """Generate today's task queue with agent assignments."""
    tasks = []
    
    # Load from TASK.md
    current_tasks = parse_task_md()
    
    # Load from recent commits
    recent_work = get_recent_commits(5)
    
    # Analyze blockers
    blockers = find_blockers()
    
    # Smart prioritization
    for task in current_tasks:
        agent = assign_agent(task)
        priority = calculate_priority(task, blockers, recent_work)
        tasks.append({
            'task': task,
            'agent': agent,
            'priority': priority
        })
    
    return sorted(tasks, key=lambda x: x['priority'], reverse=True)
```

### Step 3: Agent Assignment Logic
```python
AGENT_MAPPING = {
    # Keywords to agent mapping
    'debug': 'debug-like-expert',
    'error': 'debugging-toolkit:debugger',
    'test': 'unit-testing:test-automator',
    'api': 'backend-development:backend-architect',
    'frontend': 'frontend-mobile-development:frontend-developer',
    'database': 'database-design:schema-design',
    'deploy': 'deployment-strategies:deployment-engineer',
    'research': 'research-skill',
    'plan': 'planning-prompts-skill',
    'refactor': 'code-refactoring:legacy-modernizer',
    'security': 'full-stack-orchestration:security-auditor',
}

def assign_agent(task_description):
    """Intelligently assign agent based on task description."""
    desc_lower = task_description.lower()
    
    for keyword, agent in AGENT_MAPPING.items():
        if keyword in desc_lower:
            return agent
    
    return 'general-purpose'  # Default
```

## Output Formatting

### Standard Session Start Template
```markdown
## Session Start: [PROJECT_NAME]
*Branch: [BRANCH] | Type: [PROJECT_TYPE] | Started: [TIME]*

### Git Status
- Current: [BRANCH] ([COMMITS_AHEAD] ahead)
- Unstaged: [COUNT] files
- Staged: [COUNT] files

### Completed (Last Session)
[List from PROJECT_CONTEXT.md]

### In Progress
[Table with worktree info]

### Blockers
[List from context files]

### Today's Priority Queue
[Numbered list with agent assignments]

### Cost Context
- Today: $[TODAY] | MTD: $[MTD] / $[BUDGET] ([PERCENT]%)
- Remaining budget: $[REMAINING]
- Avg cost/task: $[AVG]

### Available Resources
- Worktrees: [ACTIVE]/4 max
- Ports: [NEXT_PORT] next available
- Memory: [FREE]GB free
```

## Integration Points

### 1. With TodoWrite
Automatically create todos from priority queue:
```javascript
const todos = priorityQueue.map((item, idx) => ({
    id: `day-${idx}`,
    content: item.task,
    status: 'pending',
    priority: item.priority > 8 ? 'high' : 'medium'
}));
```

### 2. With Worktree Manager
Check for stale worktrees:
```bash
# Worktrees older than 7 days
git worktree list --porcelain | while read line; do
    # Parse and check age
done
```

### 3. With Cost Tracking
Alert if approaching budget:
```python
if percent_used > 80:
    print("⚠️ BUDGET ALERT: {}% of monthly budget used".format(percent_used))
if remaining < 10:
    print("🚨 CRITICAL: Only ${} remaining in budget".format(remaining))
```

## Error Handling

### Missing Context Files
```bash
# Graceful fallback
if [ ! -f "PROJECT_CONTEXT.md" ]; then
    echo "No previous context found. Starting fresh session."
    # Initialize with defaults
fi
```

### Git Repository Issues
```bash
if ! git rev-parse --is-inside-work-tree >/dev/null 2>&1; then
    echo "Not a git repository. Limited context available."
    # Fall back to file-based context only
fi
```

### Cost Tracking Errors
```bash
# Initialize if missing
if [ ! -d "costs" ]; then
    mkdir -p costs
    echo '{"total": 0, "budget": 100}' > costs/mtd.json
fi
```

## Performance Optimizations

### 1. Parallel Context Loading
```bash
# Load all context files in parallel
{
    cat PROJECT_CONTEXT.md 2>/dev/null &
    cat CLAUDE.md 2>/dev/null &
    cat TASK.md 2>/dev/null &
    cat PLANNING.md 2>/dev/null &
    wait
} | process_context
```

### 2. Caching Git Information
```bash
# Cache expensive git operations
GIT_CACHE=~/.claude/git-cache/$PROJ_NAME
mkdir -p "$GIT_CACHE"

# Cache branch info for 5 minutes
if [ ! -f "$GIT_CACHE/branches" ] || [ $(find "$GIT_CACHE/branches" -mmin +5) ]; then
    git branch -r > "$GIT_CACHE/branches"
fi
```

### 3. Smart Worktree Detection
Only scan worktrees if registry indicates active ones:
```bash
ACTIVE_COUNT=$(jq --arg p "$PROJ_NAME" '[.worktrees[] | select(.project == $p)] | length' "$REGISTRY")
if [ "$ACTIVE_COUNT" -gt 0 ]; then
    git worktree list
fi
```

## Security Considerations

### 1. Secrets Detection
```bash
# Quick scan for exposed secrets
grep -r "API_KEY\|SECRET\|PASSWORD" --include="*.env*" . 2>/dev/null | head -5
if [ $? -eq 0 ]; then
    echo "⚠️ WARNING: Potential secrets detected in environment files"
fi
```

### 2. Dependency Audit
```bash
# Quick security check
if [ "$PROJ_TYPE" = "node" ]; then
    npm audit --audit-level=critical 2>/dev/null | grep "found.*vulnerabilities"
elif [ "$PROJ_TYPE" = "python" ]; then
    pip-audit --desc 2>/dev/null | grep "Vulnerability"
fi
```

## Customization Options

### 1. Project-Specific Overrides
```bash
# Check for project-specific start script
if [ -f ".claude/start-day.sh" ]; then
    source .claude/start-day.sh
fi
```

### 2. User Preferences
```bash
# Load user preferences
PREFS=~/.claude/preferences.json
if [ -f "$PREFS" ]; then
    START_DAY_FORMAT=$(jq -r '.start_day_format // "standard"' "$PREFS")
    SHOW_COSTS=$(jq -r '.show_costs // true' "$PREFS")
    AUTO_TODO=$(jq -r '.auto_create_todos // true' "$PREFS")
fi
```

### 3. Team Configurations
```bash
# Team-wide settings
TEAM_CONFIG=/opt/claude/team-config.json
if [ -f "$TEAM_CONFIG" ]; then
    source "$TEAM_CONFIG"
fi
```
```

### reference/research-workflow.md

```markdown
# Research Workflow

Systematic approach to technical and market research with cost optimization and decision gates.

## Research Triggers

### When Research is Mandatory
1. **New Technology Adoption**
   - Considering new framework/library
   - Evaluating API providers
   - Selecting infrastructure services

2. **Architectural Decisions**
   - Microservices vs monolith
   - Database selection
   - Authentication approaches

3. **Cost-Impacting Choices**
   - LLM provider selection
   - Cloud platform decisions
   - SaaS tool evaluation

4. **Build vs Buy Decisions**
   - Custom implementation vs service
   - Open source vs commercial
   - In-house vs outsourced

## Phase 1: Existing Solution Scan

### Local Repository Search
```bash
# Search your existing projects first
SEARCH_TERM="$1"
BASE_DIR=~/tk_projects

# Find similar implementations
find "$BASE_DIR" -name "*.md" -o -name "*.py" -o -name "*.js" -o -name "*.ts" | \
    xargs grep -l "$SEARCH_TERM" 2>/dev/null | \
    grep -v node_modules | \
    head -20

# Search documentation
find "$BASE_DIR" -name "README.md" -o -name "ARCHITECTURE.md" | \
    xargs grep -A5 -B5 "$SEARCH_TERM" 2>/dev/null
```

### MCP Cookbook Check
```bash
# Always check cookbook first
COOKBOOK=/Users/tmkipper/Desktop/tk_projects/mcp-server-cookbook

if [ -d "$COOKBOOK" ]; then
    echo "=== MCP Cookbook Matches ==="
    grep -r "$SEARCH_TERM" "$COOKBOOK" --include="*.md" | head -10
fi
```

### Previous Research Cache
```bash
# Check if we've researched this before
RESEARCH_CACHE=~/.claude/research-cache
mkdir -p "$RESEARCH_CACHE"

HASH=$(echo "$SEARCH_TERM" | md5)
if [ -f "$RESEARCH_CACHE/$HASH.md" ]; then
    echo "Found previous research:"
    cat "$RESEARCH_CACHE/$HASH.md"
fi
```

## Phase 2: Evaluation Framework

### Technical Evaluation Matrix
```markdown
| Criteria | Weight | Option A | Option B | Option C |
|----------|--------|----------|----------|----------|
| Performance | 25% | 8/10 | 6/10 | 9/10 |
| Cost | 30% | $0 | $25/mo | $5/mo |
| Learning Curve | 15% | Low | Medium | High |
| Community | 10% | Large | Small | Medium |
| Maintenance | 20% | Active | Stale | Active |
| **Total Score** | | 7.5 | 5.8 | 8.2 |
```

### Cost Projection Model
```python
def project_costs(option, scale_factor=1.0):
    """Project costs for different scaling scenarios."""
    
    # Base costs
    base_costs = {
        'inference': option.get('cost_per_1k_tokens', 0) * estimated_tokens,
        'storage': option.get('storage_gb_cost', 0) * estimated_storage,
        'compute': option.get('compute_hour_cost', 0) * estimated_hours,
        'fixed': option.get('monthly_fixed', 0)
    }
    
    # Scale projections
    scenarios = {
        'current': base_costs,
        '10x': {k: v * 10 for k, v in base_costs.items()},
        '100x': {k: v * 100 for k, v in base_costs.items()},
        '1000x': {k: v * 1000 for k, v in base_costs.items()}
    }
    
    return scenarios
```

### LLM Selection Logic
```python
LLM_DECISION_TREE = {
    'reasoning': {
        'complex': 'claude-sonnet',      # Quality critical
        'simple': 'deepseek-v3',         # 95% cheaper
        'local': 'ollama-llama3'         # Free
    },
    'generation': {
        'code': 'claude-sonnet',         # Accuracy matters
        'content': 'deepseek-v3',        # Good enough
        'test': 'qwen-72b'              # Alternative
    },
    'embeddings': {
        'semantic': 'voyage-3',          # Best quality
        'basic': 'jina-embeddings-v3',   # Cheaper
        'local': 'ollama-nomic-embed'    # Free
    },
    'analysis': {
        'financial': 'deepseek-v3',      # Math capable
        'technical': 'claude-sonnet',    # Nuanced
        'summary': 'qwen-72b'           # Fast & cheap
    }
}
```

## Phase 3: Research Execution

### Structured Research Template
```markdown
# Research: [TOPIC]
Date: [DATE]
Researcher: Claude
Time Invested: [X] minutes

## Executive Summary
**One-liner:** [Concise conclusion]
**Recommendation:** [GO/NO-GO]
**Confidence:** [1-10]

## Options Evaluated
1. **Option A** - [Brief description]
   - Pros: [List]
   - Cons: [List]
   - Cost: [$ amount]
   
2. **Option B** - [Brief description]
   - Pros: [List]
   - Cons: [List]
   - Cost: [$ amount]

## Decision Factors
1. **Critical:** [What matters most]
2. **Important:** [Secondary concerns]
3. **Nice-to-have:** [Bonus features]

## Cost Analysis
| Option | Monthly | Yearly | 10x Scale |
|--------|---------|--------|-----------|
| A | $X | $Y | $Z |
| B | $X | $Y | $Z |

## Risk Assessment
- **Technical Risks:** [List]
- **Business Risks:** [List]
- **Mitigation Strategies:** [List]

## Implementation Estimate
- **Time to POC:** [X days]
- **Time to Production:** [Y weeks]
- **Team Resources:** [Z engineers]

## Open Questions
1. [Question needing follow-up]
2. [Uncertain aspect]

## Decision
**Recommended:** [Option X]
**Rationale:** [2-3 sentences]
**Next Steps:** [Concrete actions]
```

### Research Execution Workflow
```bash
#!/bin/bash
# research-workflow.sh

TOPIC="$1"
OUTPUT="RESEARCH_$(date +%Y%m%d_%H%M%S).md"

# Step 1: Local scan
echo "# Research: $TOPIC" > "$OUTPUT"
echo "## Local Repository Findings" >> "$OUTPUT"
./scan-local-repos.sh "$TOPIC" >> "$OUTPUT"

# Step 2: MCP Cookbook
echo "## MCP Cookbook Patterns" >> "$OUTPUT"
./scan-mcp-cookbook.sh "$TOPIC" >> "$OUTPUT"

# Step 3: Web research (using DeepSeek for cost)
echo "## Market Research" >> "$OUTPUT"
# Use research-skill with DeepSeek

# Step 4: Cost projections
echo "## Cost Projections" >> "$OUTPUT"
python project-costs.py "$TOPIC" >> "$OUTPUT"

# Step 5: Generate recommendation
echo "## Recommendation" >> "$OUTPUT"
```

## Phase 4: Decision Gates

### Gate 1: Initial Viability
```python
def initial_viability_check(research):
    """Quick check before deep research."""
    
    checks = {
        'cost_reasonable': research.monthly_cost < budget * 0.2,
        'technically_feasible': research.complexity <= team_capability,
        'time_available': research.implementation_time < deadline,
        'aligns_with_strategy': research.strategic_fit > 7
    }
    
    if not all(checks.values()):
        return "NO-GO", checks
    
    return "PROCEED", checks
```

### Gate 2: Deep Dive Decision
```python
def deep_dive_decision(research):
    """Final decision after thorough research."""
    
    score = 0
    weights = {
        'cost_efficiency': 0.3,
        'technical_fit': 0.25,
        'scalability': 0.2,
        'maintenance': 0.15,
        'team_comfort': 0.1
    }
    
    for factor, weight in weights.items():
        score += research[factor] * weight
    
    decision = "GO" if score >= 7 else "NO-GO"
    confidence = min(10, int(score * 1.2))
    
    return decision, confidence, score
```

### Gate 3: Implementation Review
After POC or initial implementation:
```markdown
## Implementation Review Gate

### POC Results
- **Performance:** [Meets/Exceeds/Below] expectations
- **Cost Actual:** $[X] vs Projected $[Y]
- **Integration Effort:** [Easy/Medium/Hard]
- **Team Feedback:** [Positive/Mixed/Negative]

### Decision
[ ] Proceed to production
[ ] Iterate on POC
[ ] Pivot to alternative
[ ] Abandon approach
```

## Cost Optimization Strategies

### 1. Provider Selection by Use Case
```python
PROVIDER_MATRIX = {
    # Use case -> (primary, fallback, dev)
    'customer_facing': ('claude-sonnet', 'gpt-4', None),
    'internal_tools': ('deepseek-v3', 'qwen-72b', 'ollama'),
    'batch_processing': ('deepseek-v3', 'qwen-72b', 'groq'),
    'embeddings': ('voyage-3', 'jina-v3', 'nomic-embed'),
    'development': ('ollama', 'deepseek-v3', None),
}
```

### 2. Caching Strategy
```python
CACHE_POLICIES = {
    'embeddings': 30 * 24 * 60 * 60,  # 30 days
    'llm_responses': 7 * 24 * 60 * 60,  # 7 days
    'search_results': 24 * 60 * 60,     # 1 day
    'static_analysis': 'forever',       # Never expires
}
```

### 3. Batch vs Streaming
```python
def choose_processing_mode(task):
    if task.latency_sensitive:
        return 'streaming'
    elif task.volume > 1000:
        return 'batch'  # Better pricing
    else:
        return 'standard'
```

## Research Tools Integration

### With /research-skill
```bash
# Trigger research skill for market analysis
echo "Use /research-skill for market analysis of $TOPIC"
```

### With Web Search
```python
# Use DeepSeek for research queries
search_queries = [
    f"{topic} comparison 2024",
    f"{topic} pricing calculator",
    f"{topic} vs alternatives",
    f"{topic} production experience",
    f"{topic} scaling issues"
]
```

### With Cost Tracking
```bash
# Log research costs
RESEARCH_COST=$(calculate_research_cost)
echo "{\"task\": \"research\", \"topic\": \"$TOPIC\", \"cost\": $RESEARCH_COST}" >> costs/by-feature.jsonl
```

## Common Research Patterns

### 1. Framework Selection
```markdown
Research Checklist:
- [ ] Check awesome-{framework} lists
- [ ] Review GitHub stars/issues/activity
- [ ] Find production case studies
- [ ] Evaluate ecosystem (tools, libs)
- [ ] Check job market demand
- [ ] Review upgrade/migration paths
```

### 2. API Provider Evaluation
```markdown
API Checklist:
- [ ] Pricing at current and 10x scale
- [ ] Rate limits and quotas
- [ ] Latency from your regions
- [ ] SDK quality and languages
- [ ] Error handling and retries
- [ ] Data retention policies
```

### 3. Infrastructure Decisions
```markdown
Infrastructure Checklist:
- [ ] Multi-region capabilities
- [ ] Scaling limits and costs
- [ ] Compliance certifications
- [ ] Disaster recovery options
- [ ] Vendor lock-in assessment
- [ ] Migration complexity
```

## Research Artifacts

### 1. Decision Documents
Store in: `docs/decisions/`
```markdown
# ADR-001: Chose Supabase over Firebase

## Status
Accepted

## Context
[Background and requirements]

## Decision
[What we decided]

## Consequences
[What happens as a result]
```

### 2. Comparison Matrices
Store in: `docs/research/`
```markdown
# Provider Comparison: Vector Databases

| Feature | Pinecone | Weaviate | Qdrant | Chroma |
|---------|----------|----------|---------|---------|
| Pricing | $$$ | $$ | $$ | $ |
| Performance | A | B+ | A- | B |
| Ease of Use | A | B | B+ | A |
```

### 3. Cost Models
Store in: `docs/costs/`
```python
# cost_model_embeddings.py
def calculate_embedding_costs(documents, provider='voyage'):
    # Detailed cost calculation model
    pass
```

## Anti-Patterns to Avoid

### 1. Research Paralysis
- Set time box: 2 hours max for initial research
- Use confidence scores, not perfection
- "Good enough" at 80% confidence > Perfect at 95%

### 2. Ignoring Existing Solutions
- Always check your own code first
- Look for similar problems already solved
- Consider adapting existing approach

### 3. Over-Engineering Research
- Simple decisions need simple research
- Not everything needs a formal process
- Use judgment on research depth

### 4. Ignoring Hidden Costs
- Developer time to learn
- Maintenance burden
- Switching costs later
- Operational overhead
```

### reference/feature-development.md

```markdown
# Feature Development Workflow

Comprehensive guide for multi-phase feature development with parallel execution, security gates, and quality checkpoints.

## Phase 0: Planning & Design

### Feature Brief Template
```markdown
# Feature: [NAME]
**JIRA/Issue:** [LINK]
**Priority:** P0/P1/P2
**Estimated Effort:** [X] days

## Problem Statement
[What problem does this solve?]

## Success Criteria
- [ ] Metric 1 improves by X%
- [ ] User can accomplish Y
- [ ] Performance remains under Zms

## Scope
### In Scope
- [Included functionality]

### Out of Scope
- [Explicitly excluded]

## Technical Approach
[High-level architecture]

## Dependencies
- External: [APIs, services]
- Internal: [Other features]

## Risks
1. [Risk]: [Mitigation]
```

### Agent Mapping
```python
FEATURE_AGENTS = {
    'planning': ['planning-prompts-skill', 'feature-dev:code-architect'],
    'database': ['database-design:schema-design', 'supabase-sql-skill'],
    'backend': ['backend-development:backend-architect', 'python-development:fastapi-pro'],
    'frontend': ['frontend-mobile-development:frontend-developer', 'javascript-typescript:typescript-pro'],
    'testing': ['unit-testing:test-automator', 'tdd-workflows:tdd-orchestrator'],
    'security': ['full-stack-orchestration:security-auditor', 'comprehensive-review:security-auditor'],
    'deployment': ['deployment-strategies:deployment-engineer', 'cicd-automation:deployment-engineer']
}
```

### Parallelization Analysis
```python
def analyze_parallelization(feature_tasks):
    """Identify which tasks can run in parallel."""
    
    dependencies = build_dependency_graph(feature_tasks)
    parallel_groups = []
    
    # Find independent task sets
    for task_set in find_independent_sets(dependencies):
        if len(task_set) > 1:
            parallel_groups.append({
                'tasks': task_set,
                'worktrees_needed': len(task_set),
                'estimated_time': max(t.estimate for t in task_set)
            })
    
    return parallel_groups
```

### Cost Estimation
```python
def estimate_feature_cost(feature_plan):
    """Estimate total cost for feature development."""
    
    costs = {
        'development': {
            'hours': feature_plan.dev_hours,
            'llm_calls': feature_plan.dev_hours * 50,  # Avg 50 LLM calls/hour
            'cost': feature_plan.dev_hours * 50 * 0.003  # Claude rate
        },
        'testing': {
            'hours': feature_plan.test_hours,
            'llm_calls': feature_plan.test_hours * 30,
            'cost': feature_plan.test_hours * 30 * 0.00014  # DeepSeek for tests
        },
        'infrastructure': {
            'database': 0,  # Supabase free tier
            'compute': feature_plan.compute_hours * 0.50,  # RunPod estimate
            'storage': feature_plan.storage_gb * 0.02
        }
    }
    
    total = sum(c['cost'] for c in costs.values())
    return costs, total
```

## Phase 1: Database & Schema

### Schema Design Checklist
```markdown
- [ ] Entity relationship diagram created
- [ ] Indexes identified for common queries
- [ ] RLS policies defined
- [ ] Migration rollback plan
- [ ] Test data generation strategy
```

### Database Setup Flow
```bash
# 1. Design schema
/database-design:schema-design

# 2. Create migration
/supabase-sql-skill

# 3. Review generated SQL
cat migrations/$(date +%Y%m%d)_feature_name.sql

# 4. Test in development
supabase db reset
supabase db push

# 5. Verify with test queries
psql $DATABASE_URL < test/queries/feature_verification.sql
```

### RLS Policy Template
```sql
-- Row Level Security for feature tables
CREATE POLICY "Users can view own data"
ON feature_table
FOR SELECT
USING (auth.uid() = user_id);

CREATE POLICY "Users can insert own data"
ON feature_table
FOR INSERT
WITH CHECK (auth.uid() = user_id);

CREATE POLICY "Users can update own data"
ON feature_table
FOR UPDATE
USING (auth.uid() = user_id)
WITH CHECK (auth.uid() = user_id);

-- Enable RLS
ALTER TABLE feature_table ENABLE ROW LEVEL SECURITY;
```

**Gate: Schema Review Required** ⛔
- [ ] Naming conventions followed
- [ ] Proper indexes in place
- [ ] RLS policies comprehensive
- [ ] No performance anti-patterns

## Phase 2: Parallel Implementation

### Worktree Orchestration
```bash
#!/bin/bash
# parallel-feature-setup.sh

PROJECT=$(basename $(pwd))
FEATURE="feature-name"
BASE_PORT=8100

# Create worktrees for parallel development
create_worktree() {
    local name=$1
    local port=$2
    
    echo "Creating worktree: $name (ports: $port, $((port+1)))"
    git worktree add -b "$FEATURE-$name" ~/tmp/worktrees/$PROJECT/$name
    
    # Register in worktree registry
    update_registry "$PROJECT" "$FEATURE-$name" "$port" "$((port+1))"
    
    # Copy environment
    cp .env ~/tmp/worktrees/$PROJECT/$name/.env
    
    # Update ports in .env
    sed -i "s/PORT=.*/PORT=$port/" ~/tmp/worktrees/$PROJECT/$name/.env
}

# Spawn parallel worktrees
create_worktree "backend" $BASE_PORT
create_worktree "frontend" $((BASE_PORT+2))
create_worktree "tests" $((BASE_PORT+4))
```

### Port Management
```python
class PortAllocator:
    """Manage port allocation for parallel development."""
    
    PORT_POOL = range(8100, 8200, 2)  # Even numbers, pairs reserved
    
    def __init__(self, registry_path="~/.claude/worktree-registry.json"):
        self.registry = self.load_registry(registry_path)
    
    def allocate_ports(self, count=2):
        """Allocate consecutive ports for a worktree."""
        used_ports = self.get_used_ports()
        
        for port in self.PORT_POOL:
            if port not in used_ports and port+1 not in used_ports:
                if self.check_ports_free(port, count):
                    return list(range(port, port+count))
        
        raise Exception("No available port pairs")
    
    def check_ports_free(self, start_port, count):
        """Verify ports are actually free on system."""
        for port in range(start_port, start_port+count):
            if self.is_port_in_use(port):
                return False
        return True
```

### Task Distribution
```markdown
## Worktree A: Backend API
**Branch:** feature/api-backend
**Ports:** 8100 (API), 8101 (Debug)
**Agent:** backend-development:backend-architect

Tasks:
1. Create API endpoints
2. Implement business logic
3. Add authentication
4. Create integration tests

## Worktree B: Frontend UI
**Branch:** feature/ui
**Ports:** 8102 (Dev), 8103 (Storybook)
**Agent:** frontend-mobile-development:frontend-developer

Tasks:
1. Create components
2. Implement state management
3. Connect to API
4. Add loading/error states

## Worktree C: Testing
**Branch:** feature/tests
**Ports:** 8104 (Test server), 8105 (Coverage)
**Agent:** unit-testing:test-automator

Tasks:
1. Unit test suite
2. Integration tests
3. E2E test scenarios
4. Performance benchmarks
```

### Progress Synchronization
```python
def sync_worktree_progress():
    """Monitor and sync progress across worktrees."""
    
    status = {}
    
    for worktree in get_active_worktrees():
        # Check git status
        branch_status = check_git_status(worktree.path)
        
        # Check test status
        test_status = run_tests(worktree.path, quiet=True)
        
        # Check TODO completion
        todo_status = get_todo_progress(worktree.branch)
        
        status[worktree.name] = {
            'commits': branch_status['ahead'],
            'changes': branch_status['changes'],
            'tests': test_status['passing'],
            'todos': todo_status['completed_percent'],
            'blockers': identify_blockers(worktree)
        }
    
    return status
```

## Phase 3: Security & Quality Gates

### Security Scan Pipeline
```bash
#!/bin/bash
# security-scan.sh

echo "=== Security Scan Pipeline ==="

# 1. SAST Analysis
echo "[1/4] Running SAST..."
semgrep --config auto . --json -o security/sast-report.json
SAST_CRITICAL=$(jq '.results | map(select(.extra.severity == "ERROR")) | length' security/sast-report.json)

# 2. Secrets Detection
echo "[2/4] Scanning for secrets..."
gitleaks detect --source . --report-path security/secrets-report.json
SECRETS_FOUND=$(jq '.SecretsFound' security/secrets-report.json)

# 3. Dependency Audit
echo "[3/4] Auditing dependencies..."
if [ -f "package.json" ]; then
    npm audit --json > security/npm-audit.json
    VULN_CRITICAL=$(jq '.metadata.vulnerabilities.critical' security/npm-audit.json)
elif [ -f "requirements.txt" ]; then
    pip-audit --format json > security/pip-audit.json
    VULN_CRITICAL=$(jq '[.vulnerabilities[].fix_versions] | length' security/pip-audit.json)
fi

# 4. Code Coverage
echo "[4/4] Checking test coverage..."
if [ -f "package.json" ]; then
    npm test -- --coverage --json > test/coverage.json
    COVERAGE=$(jq '.total.lines.pct' test/coverage.json)
else
    pytest --cov=src --cov-report=json
    COVERAGE=$(jq '.totals.percent_covered' coverage.json)
fi

# Gate evaluation
GATE_PASSED=true
REASONS=()

if [ "$SAST_CRITICAL" -gt 0 ]; then
    GATE_PASSED=false
    REASONS+=("$SAST_CRITICAL critical SAST findings")
fi

if [ "$SECRETS_FOUND" = "true" ]; then
    GATE_PASSED=false
    REASONS+=("Secrets detected in code")
fi

if [ "$VULN_CRITICAL" -gt 0 ]; then
    GATE_PASSED=false
    REASONS+=("$VULN_CRITICAL critical vulnerabilities")
fi

if (( $(echo "$COVERAGE < 80" | bc -l) )); then
    GATE_PASSED=false
    REASONS+=("Coverage below 80% (actual: $COVERAGE%)")
fi

# Report
if [ "$GATE_PASSED" = true ]; then
    echo "✅ All security gates PASSED"
else
    echo "❌ Security gates FAILED:"
    printf '%s\n' "${REASONS[@]}"
    exit 1
fi
```

### Quality Metrics
```python
class QualityGates:
    """Enforce quality standards before merge."""
    
    THRESHOLDS = {
        'test_coverage': 80,
        'code_duplication': 5,  # Max 5% duplication
        'cyclomatic_complexity': 10,
        'lint_errors': 0,
        'type_coverage': 90,  # For TypeScript
        'bundle_size_increase': 5  # Max 5% increase
    }
    
    def check_all_gates(self, metrics):
        results = {}
        
        for metric, threshold in self.THRESHOLDS.items():
            if metric in metrics:
                passed = self.check_metric(metric, metrics[metric], threshold)
                results[metric] = {
                    'passed': passed,
                    'value': metrics[metric],
                    'threshold': threshold
                }
        
        return results
    
    def check_metric(self, metric, value, threshold):
        if metric in ['test_coverage', 'type_coverage']:
            return value >= threshold
        elif metric in ['code_duplication', 'cyclomatic_complexity', 'lint_errors', 'bundle_size_increase']:
            return value <= threshold
        return True
```

### Integration Testing
```python
# integration_test_template.py

import pytest
from httpx import AsyncClient

class TestFeatureIntegration:
    """Integration tests for new feature."""
    
    @pytest.fixture
    async def client(self):
        async with AsyncClient(base_url="http://localhost:8100") as client:
            yield client
    
    @pytest.mark.asyncio
    async def test_full_user_journey(self, client):
        """Test complete user journey through feature."""
        # 1. Create user
        user_response = await client.post("/auth/register", json={...})
        assert user_response.status_code == 201
        
        # 2. Authenticate
        token = user_response.json()["token"]
        headers = {"Authorization": f"Bearer {token}"}
        
        # 3. Use feature
        feature_response = await client.post(
            "/api/feature",
            json={...},
            headers=headers
        )
        assert feature_response.status_code == 200
        
        # 4. Verify side effects
        verify_response = await client.get(
            f"/api/feature/{feature_response.json()['id']}",
            headers=headers
        )
        assert verify_response.json()["status"] == "completed"
```

## Phase 4: Ship & Deploy

### Pre-Deploy Checklist
```markdown
## Deployment Readiness Checklist

### Code Quality
- [ ] All tests passing
- [ ] Security scan clean
- [ ] Code review approved
- [ ] Documentation updated

### Performance
- [ ] Load tested at expected scale
- [ ] Database queries optimized
- [ ] Caching strategy implemented
- [ ] CDN configured for assets

### Monitoring
- [ ] Metrics instrumented
- [ ] Alerts configured
- [ ] Error tracking enabled
- [ ] Logs structured

### Rollback Plan
- [ ] Feature flag created
- [ ] Database rollback tested
- [ ] Canary deployment ready
- [ ] Incident response plan

### Documentation
- [ ] API docs updated
- [ ] User guides created
- [ ] Team runbook updated
- [ ] Architecture diagram current
```

### Deployment Pipeline
```yaml
# .github/workflows/deploy-feature.yml
name: Deploy Feature

on:
  pull_request:
    types: [closed]
    branches: [main]

jobs:
  deploy:
    if: github.event.pull_request.merged == true
    runs-on: ubuntu-latest
    
    steps:
    - name: Checkout
      uses: actions/checkout@v3
    
    - name: Run Security Gates
      run: ./scripts/security-scan.sh
    
    - name: Deploy Database Migrations
      run: |
        supabase db push --project-ref ${{ secrets.SUPABASE_PROJECT_REF }}
    
    - name: Deploy Backend
      run: |
        docker build -t feature-api .
        docker push $REGISTRY/feature-api:${{ github.sha }}
        kubectl set image deployment/api api=$REGISTRY/feature-api:${{ github.sha }}
    
    - name: Deploy Frontend
      run: |
        npm run build
        vercel --prod
    
    - name: Run Smoke Tests
      run: |
        npm run test:e2e:smoke
    
    - name: Update Feature Flags
      run: |
        # Enable feature for 10% of users
        curl -X PATCH $FEATURE_FLAG_API/features/new-feature \
          -H "Authorization: Bearer ${{ secrets.FF_TOKEN }}" \
          -d '{"rollout_percentage": 10}'
```

### Cost Logging
```python
def log_feature_cost(feature_name, actual_costs):
    """Log actual costs for feature development."""
    
    entry = {
        'feature': feature_name,
        'date': datetime.now().isoformat(),
        'costs': {
            'development_hours': actual_costs['dev_hours'],
            'llm_tokens': actual_costs['llm_tokens'],
            'llm_cost': actual_costs['llm_cost'],
            'infrastructure': actual_costs['infra_cost'],
            'total': sum(actual_costs.values())
        },
        'roi_metrics': {
            'users_impacted': get_feature_usage(feature_name),
            'revenue_impact': estimate_revenue_impact(feature_name),
            'cost_per_user': actual_costs['total'] / get_feature_usage(feature_name)
        }
    }
    
    # Append to cost log
    with open('costs/by-feature.jsonl', 'a') as f:
        f.write(json.dumps(entry) + '\n')
    
    # Update feature documentation
    update_feature_docs(feature_name, entry)
```

## Merge & Cleanup

### Merge Strategy
```bash
#!/bin/bash
# merge-feature-branches.sh

FEATURE="feature-name"

# 1. Merge backend
git checkout main
git merge --no-ff "$FEATURE-backend" -m "feat(backend): Add $FEATURE API endpoints"

# 2. Merge frontend
git merge --no-ff "$FEATURE-frontend" -m "feat(frontend): Add $FEATURE UI components"

# 3. Merge tests
git merge --no-ff "$FEATURE-tests" -m "test: Add $FEATURE test suite"

# 4. Tag release
git tag -a "v1.2.0-$FEATURE" -m "Release: $FEATURE feature"

# 5. Clean up worktrees
for worktree in backend frontend tests; do
    git worktree remove ~/tmp/worktrees/$PROJECT/$worktree
done

# 6. Update registry
update_registry --remove "$FEATURE-*"
```

### Documentation Updates
```markdown
## Updates Required

1. **CLAUDE.md**
   - Add new patterns learned
   - Document architectural decisions
   - Note performance considerations

2. **PLANNING.md**
   - Move feature to completed
   - Update roadmap progress
   - Note any scope changes

3. **README.md**
   - Add feature to feature list
   - Update setup instructions if needed
   - Add configuration examples

4. **API Documentation**
   - New endpoints
   - Request/response examples
   - Error codes

5. **Architecture Docs**
   - Update diagrams
   - Document new components
   - Explain design decisions
```

## Common Patterns

### 1. API Endpoint Pattern
```python
# Standard API endpoint structure
@router.post("/feature", response_model=FeatureResponse)
async def create_feature(
    request: FeatureRequest,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
    cache: Redis = Depends(get_cache),
    metrics: MetricsClient = Depends(get_metrics)
):
    """Create new feature with standard patterns."""
    
    # Input validation
    await validate_feature_request(request, current_user)
    
    # Business logic
    with metrics.timer("feature.create"):
        feature = await feature_service.create(
            db=db,
            user=current_user,
            data=request
        )
    
    # Cache invalidation
    await cache.delete(f"user:{current_user.id}:features")
    
    # Event emission
    await emit_event("feature.created", {
        "user_id": current_user.id,
        "feature_id": feature.id
    })
    
    return feature
```

### 2. Frontend Component Pattern
```typescript
// Standard React component structure
export const FeatureComponent: React.FC<FeatureProps> = ({ 
    initialData,
    onSuccess,
    onError 
}) => {
    // State management
    const [state, dispatch] = useReducer(featureReducer, initialState);
    const queryClient = useQueryClient();
    
    // Data fetching
    const { data, isLoading, error } = useQuery({
        queryKey: ['feature', initialData?.id],
        queryFn: () => fetchFeature(initialData?.id),
        enabled: !!initialData?.id
    });
    
    // Mutations
    const createMutation = useMutation({
        mutationFn: createFeature,
        onSuccess: (data) => {
            queryClient.invalidateQueries(['features']);
            onSuccess?.(data);
        },
        onError: (error) => {
            console.error('Feature creation failed:', error);
            onError?.(error);
        }
    });
    
    // Error boundary
    if (error) return <ErrorDisplay error={error} />;
    
    // Loading state
    if (isLoading) return <FeatureSkeleton />;
    
    // Render
    return (
        <FeatureProvider value={{ state, dispatch }}>
            <FeatureUI data={data} onCreate={createMutation.mutate} />
        </FeatureProvider>
    );
};
```

### 3. Test Pattern
```python
# Standard test structure
class TestFeature:
    """Comprehensive test suite for feature."""
    
    @pytest.fixture
    def feature_data(self):
        """Standard test data."""
        return {
            "name": "Test Feature",
            "config": {"enabled": True}
        }
    
    def test_create_success(self, client, auth_headers, feature_data):
        """Test successful creation."""
        response = client.post(
            "/api/feature",
            json=feature_data,
            headers=auth_headers
        )
        
        assert response.status_code == 201
        assert response.json()["name"] == feature_data["name"]
    
    def test_create_validation(self, client, auth_headers):
        """Test input validation."""
        response = client.post(
            "/api/feature",
            json={"invalid": "data"},
            headers=auth_headers
        )
        
        assert response.status_code == 422
        assert "validation_error" in response.json()["detail"]
    
    def test_create_unauthorized(self, client, feature_data):
        """Test authentication required."""
        response = client.post("/api/feature", json=feature_data)
        assert response.status_code == 401
```
```

### reference/debug-methodology.md

```markdown
# Debug Methodology

Systematic debugging approach with evidence gathering, hypothesis testing, and root cause analysis.

## Core Principles

### 1. No Drive-By Fixes
**Rule:** If you can't explain WHY something is broken, you can't properly fix it.

Bad approach:
```python
# "Let me just try adding this..."
try:
    result = problematic_function()
except:
    result = None  # This might work?
```

Good approach:
```python
# Evidence: Stack trace shows KeyError on line 42
# Root cause: API response missing expected field when user not authenticated
# Fix: Add proper validation before access
if not response.get('user_data'):
    raise AuthenticationError("User data missing - authentication required")
result = response['user_data']['profile']
```

### 2. Evidence-Based Debugging
Document everything BEFORE attempting fixes:

```markdown
## Debug Session: [Issue Name]
Started: [Timestamp]

### 1. Observable Symptoms
- Error message: `TypeError: cannot read property 'map' of undefined`
- Occurs when: User clicks "Load More" button
- Frequency: Intermittent (~30% of attempts)
- First reported: 2 days ago

### 2. Environment
- Browser: Chrome 120.0.6099.129
- OS: macOS 14.2
- API Version: v2.1.3
- Last deployment: 3 days ago

### 3. Reproduction Steps
1. Login as test user
2. Navigate to /dashboard
3. Scroll to bottom
4. Click "Load More" → Error occurs

### 4. Initial Observations
- Works on first page load
- Fails on subsequent loads
- API returns 200 but sometimes empty array
```

### 3. One Variable at a Time
**Never change multiple things simultaneously.**

```python
# BAD: Multiple changes
def debug_issue():
    # Changed API endpoint AND error handling AND retry logic
    response = call_new_api()  # Change 1
    with_retries = add_retry_logic(response)  # Change 2  
    formatted = new_error_handler(with_retries)  # Change 3
    return formatted

# GOOD: Isolate changes
def debug_issue_step1():
    # Test ONLY endpoint change
    response = call_new_api()
    return old_error_handler(response)

# If that works, then test next change...
```

### 4. Systematic Hypothesis Testing
```python
class DebugHypothesis:
    """Structure for systematic debugging."""
    
    def __init__(self, description, evidence, test_method):
        self.description = description
        self.evidence = evidence
        self.test_method = test_method
        self.result = None
        
    def test(self):
        """Execute test and record result."""
        print(f"Testing: {self.description}")
        print(f"Evidence: {self.evidence}")
        
        self.result = self.test_method()
        
        print(f"Result: {'CONFIRMED' if self.result else 'REJECTED'}")
        return self.result

# Example usage
hypotheses = [
    DebugHypothesis(
        "Race condition in state update",
        "Error only occurs on fast clicking",
        lambda: test_with_debounce()
    ),
    DebugHypothesis(
        "Cache corruption",
        "Works after clearing browser cache",
        lambda: test_with_fresh_cache()
    ),
]

for h in hypotheses:
    if h.test():
        print(f"Root cause found: {h.description}")
        break
```

## Debug Workflow Stages

### Stage 1: Information Gathering

#### Automated Context Collection
```bash
#!/bin/bash
# debug-context.sh

echo "=== Debug Context Collection ==="
echo "Timestamp: $(date -Iseconds)"

# System info
echo -e "\n## System"
uname -a
echo "Node: $(node --version 2>/dev/null || echo 'Not installed')"
echo "Python: $(python3 --version 2>/dev/null || echo 'Not installed')"

# Project info
echo -e "\n## Project"
pwd
git rev-parse HEAD
git status --short

# Recent changes
echo -e "\n## Recent Changes"
git log --oneline -10

# Running processes
echo -e "\n## Related Processes"
ps aux | grep -E "(node|python|nginx)" | grep -v grep

# Error logs
echo -e "\n## Recent Errors"
if [ -f "logs/error.log" ]; then
    tail -20 logs/error.log
fi

# Network
echo -e "\n## Network"
netstat -an | grep LISTEN | grep -E "(3000|8000|5432)"
```

#### Error Pattern Analysis
```python
import re
from collections import Counter
from datetime import datetime, timedelta

def analyze_error_patterns(log_file, hours_back=24):
    """Analyze error patterns in logs."""
    
    patterns = {
        'null_reference': r'(null|undefined|NoneType|nil).*(reference|property|attribute)',
        'timeout': r'(timeout|timed out|deadline exceeded)',
        'connection': r'(connection|socket|ECONNREFUSED)',
        'auth': r'(unauthorized|401|403|authentication|permission)',
        'database': r'(database|sql|constraint|foreign key)',
        'memory': r'(memory|heap|OOM|out of memory)',
        'api': r'(API|endpoint|route|404|500)',
    }
    
    errors = Counter()
    timeline = []
    
    cutoff = datetime.now() - timedelta(hours=hours_back)
    
    with open(log_file) as f:
        for line in f:
            # Extract timestamp
            timestamp = extract_timestamp(line)
            if timestamp < cutoff:
                continue
                
            # Categorize error
            for category, pattern in patterns.items():
                if re.search(pattern, line, re.I):
                    errors[category] += 1
                    timeline.append((timestamp, category, line.strip()))
    
    # Analysis
    print("Error Distribution:")
    for category, count in errors.most_common():
        print(f"  {category}: {count}")
    
    # Time clustering
    print("\nError Clustering:")
    analyze_time_clusters(timeline)
    
    return errors, timeline
```

### Stage 2: Hypothesis Formation

#### Hypothesis Templates

**Template 1: Timing/Race Condition**
```markdown
### Hypothesis: Race Condition in [Component]

**Evidence:**
- Only occurs under load
- Intermittent failures
- Works with artificial delays

**Test Method:**
1. Add mutex/lock around suspicious code
2. Add detailed timing logs
3. Run stress test with concurrency

**Expected Result:**
- With lock: No failures
- Timing logs: Show overlap
```

**Template 2: State Corruption**
```markdown
### Hypothesis: State Corruption in [Store/Cache]

**Evidence:**
- Inconsistent data between requests
- Works after cache clear
- Stale data appearing

**Test Method:**
1. Log all state mutations
2. Verify state before/after operations
3. Test with cache disabled

**Expected Result:**
- State logs show unexpected mutations
- No errors with cache disabled
```

**Template 3: External Dependency**
```markdown
### Hypothesis: External Service [API/DB] Issue

**Evidence:**
- Errors started at specific time
- Works in dev but not prod
- Timeout or connection errors

**Test Method:**
1. Direct service health check
2. Compare dev vs prod configs
3. Test with service mock

**Expected Result:**
- Service returns errors/timeouts
- Config mismatch found
```

#### Hypothesis Prioritization
```python
def prioritize_hypotheses(hypotheses):
    """Score and rank hypotheses by likelihood."""
    
    for h in hypotheses:
        h.score = 0
        
        # Evidence strength (0-40 points)
        if h.evidence_count >= 3:
            h.score += 40
        elif h.evidence_count >= 2:
            h.score += 25
        else:
            h.score += 10
            
        # Testability (0-30 points)
        if h.test_time < 5:  # minutes
            h.score += 30
        elif h.test_time < 15:
            h.score += 20
        else:
            h.score += 10
            
        # Impact scope (0-30 points)
        if h.affects_all_users:
            h.score += 30
        elif h.affects_subset:
            h.score += 20
        else:
            h.score += 10
    
    return sorted(hypotheses, key=lambda h: h.score, reverse=True)
```

### Stage 3: Systematic Testing

#### Test Execution Framework
```python
class DebugTest:
    """Structured debug test execution."""
    
    def __init__(self, name):
        self.name = name
        self.setup_steps = []
        self.test_steps = []
        self.teardown_steps = []
        self.results = []
        
    def add_setup(self, step, func):
        self.setup_steps.append((step, func))
        
    def add_test(self, step, func, expected):
        self.test_steps.append((step, func, expected))
        
    def add_teardown(self, step, func):
        self.teardown_steps.append((step, func))
        
    def execute(self):
        print(f"\n=== Executing Debug Test: {self.name} ===")
        
        # Setup
        print("\n[Setup]")
        for step, func in self.setup_steps:
            print(f"  {step}...", end='')
            try:
                func()
                print(" ✓")
            except Exception as e:
                print(f" ✗ ({e})")
                return False
                
        # Tests
        print("\n[Tests]")
        for step, func, expected in self.test_steps:
            print(f"  {step}...", end='')
            try:
                result = func()
                if result == expected:
                    print(f" ✓ (got {result})")
                    self.results.append((step, True, result))
                else:
                    print(f" ✗ (expected {expected}, got {result})")
                    self.results.append((step, False, result))
            except Exception as e:
                print(f" ✗ (exception: {e})")
                self.results.append((step, False, str(e)))
                
        # Teardown
        print("\n[Teardown]")
        for step, func in self.teardown_steps:
            print(f"  {step}...", end='')
            try:
                func()
                print(" ✓")
            except:
                print(" ✗")
                
        # Summary
        passed = sum(1 for _, success, _ in self.results if success)
        total = len(self.results)
        print(f"\n[Summary] {passed}/{total} tests passed")
        
        return passed == total
```

#### Binary Search Debugging
```python
def binary_search_debug(commits, test_func):
    """Find the commit that introduced a bug."""
    
    print("Starting binary search debug...")
    
    left, right = 0, len(commits) - 1
    
    while left < right:
        mid = (left + right) // 2
        
        print(f"\nTesting commit {mid}: {commits[mid]}")
        checkout_commit(commits[mid])
        
        if test_func():
            print("  → Bug NOT present")
            left = mid + 1
        else:
            print("  → Bug IS present")
            right = mid
            
    print(f"\nBug introduced in commit: {commits[left]}")
    return commits[left]
```

### Stage 4: Root Cause Analysis

#### 5 Whys Technique
```python
class FiveWhysAnalysis:
    """Systematic root cause analysis."""
    
    def __init__(self, initial_problem):
        self.problem = initial_problem
        self.whys = []
        
    def ask_why(self, answer, evidence):
        """Add a why level with evidence."""
        self.whys.append({
            'level': len(self.whys) + 1,
            'question': f"Why {self.whys[-1]['answer'] if self.whys else self.problem}?",
            'answer': answer,
            'evidence': evidence
        })
        
    def generate_report(self):
        """Generate root cause analysis report."""
        print(f"## 5 Whys Analysis: {self.problem}\n")
        
        for why in self.whys:
            print(f"**Level {why['level']}:** {why['question']}")
            print(f"→ {why['answer']}")
            print(f"Evidence: {why['evidence']}\n")
            
        print(f"**Root Cause:** {self.whys[-1]['answer']}")
```

#### Fishbone Diagram Generator
```python
def generate_fishbone_diagram(problem, categories):
    """Create fishbone (Ishikawa) diagram for root cause analysis."""
    
    diagram = f"""
    ## Fishbone Diagram: {problem}
    
    ```
                     Environment          People
                          |                 |
                          |                 |
    Methods ______________|_________________|______________ Problem: {problem}
                          |                 |
                          |                 |
                     Materials          Technology
    ```
    
    ### Contributing Factors:
    """
    
    for category, factors in categories.items():
        diagram += f"\n**{category}:**\n"
        for factor in factors:
            diagram += f"- {factor}\n"
            
    return diagram

# Example usage
categories = {
    'Environment': [
        'Production vs Development differences',
        'Load/traffic patterns',
        'Network conditions'
    ],
    'People': [
        'User behavior patterns',
        'Admin actions',
        'Support team changes'
    ],
    'Methods': [
        'Deployment process',
        'Testing procedures',
        'Monitoring gaps'
    ],
    'Materials': [
        'Data quality issues',
        'Resource constraints',
        'Input validation'
    ],
    'Technology': [
        'API changes',
        'Library updates',
        'Infrastructure issues'
    ]
}
```

## Debug Tools & Commands

### Memory Profiling
```python
import tracemalloc
import psutil
import gc

def profile_memory_usage(func):
    """Profile memory usage of a function."""
    
    # Start tracing
    tracemalloc.start()
    gc.collect()
    
    # Get baseline
    process = psutil.Process()
    baseline_memory = process.memory_info().rss / 1024 / 1024  # MB
    
    # Execute function
    result = func()
    
    # Get peak
    current, peak = tracemalloc.get_traced_memory()
    peak_memory = process.memory_info().rss / 1024 / 1024  # MB
    
    # Get top allocations
    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics('lineno')
    
    print(f"\nMemory Profile for {func.__name__}:")
    print(f"  Baseline: {baseline_memory:.1f} MB")
    print(f"  Peak: {peak_memory:.1f} MB")
    print(f"  Allocated: {peak / 1024 / 1024:.1f} MB")
    print("\n  Top allocations:")
    
    for stat in top_stats[:5]:
        print(f"    {stat}")
        
    tracemalloc.stop()
    return result
```

### Network Debugging
```bash
#!/bin/bash
# network-debug.sh

echo "=== Network Debug ==="

# DNS resolution
echo -e "\n## DNS Resolution"
nslookup api.example.com

# Connection test
echo -e "\n## Connection Test"
nc -zv api.example.com 443

# SSL certificate
echo -e "\n## SSL Certificate"
openssl s_client -connect api.example.com:443 -servername api.example.com < /dev/null 2>/dev/null | openssl x509 -noout -dates

# Route tracing
echo -e "\n## Route Trace"
traceroute -m 10 api.example.com

# Current connections
echo -e "\n## Active Connections"
netstat -an | grep -E "ESTABLISHED|TIME_WAIT" | grep 443 | head -10

# Bandwidth test
echo -e "\n## Response Time"
curl -w "@curl-format.txt" -o /dev/null -s https://api.example.com/health
```

### Database Query Analysis
```sql
-- Query performance analysis
EXPLAIN ANALYZE
SELECT u.*, p.*
FROM users u
LEFT JOIN profiles p ON p.user_id = u.id
WHERE u.created_at > NOW() - INTERVAL '7 days'
ORDER BY u.created_at DESC
LIMIT 100;

-- Lock detection
SELECT
    pid,
    usename,
    pg_blocking_pids(pid) as blocked_by,
    query as blocked_query
FROM pg_stat_activity
WHERE cardinality(pg_blocking_pids(pid)) > 0;

-- Slow query log
SELECT
    query,
    calls,
    mean_exec_time,
    total_exec_time,
    min_exec_time,
    max_exec_time,
    stddev_exec_time
FROM pg_stat_statements
WHERE mean_exec_time > 100  -- queries averaging > 100ms
ORDER BY mean_exec_time DESC
LIMIT 20;
```

## Common Bug Patterns

### 1. Race Conditions
```javascript
// Problem pattern
let data = null;
async function loadData() {
    const response = await fetch('/api/data');
    data = await response.json();
}
function useData() {
    return data.map(item => item.name); // Race: data might be null
}

// Debug approach
function useData() {
    console.log('[DEBUG] data state:', data);
    console.log('[DEBUG] data type:', typeof data);
    console.log('[DEBUG] is array:', Array.isArray(data));
    
    if (!data || !Array.isArray(data)) {
        console.error('[DEBUG] Data not ready or invalid');
        return [];
    }
    return data.map(item => item.name);
}
```

### 2. Memory Leaks
```python
# Problem pattern
class EventManager:
    def __init__(self):
        self.handlers = []
    
    def subscribe(self, handler):
        self.handlers.append(handler)  # Leak: never removed

# Debug approach
import weakref

class EventManager:
    def __init__(self):
        self.handlers = []
        self._debug_subscriptions = 0
        
    def subscribe(self, handler):
        self._debug_subscriptions += 1
        print(f"[DEBUG] Subscription #{self._debug_subscriptions}")
        print(f"[DEBUG] Total handlers: {len(self.handlers)}")
        
        # Use weak reference to prevent leak
        self.handlers.append(weakref.ref(handler))
        
    def _clean_handlers(self):
        """Remove dead references."""
        self.handlers = [h for h in self.handlers if h() is not None]
```

### 3. Async/Promise Issues
```typescript
// Problem pattern
async function processItems(items: Item[]) {
    items.forEach(async (item) => {
        await processItem(item);  // Bug: forEach doesn't wait
    });
}

// Debug approach
async function processItems(items: Item[]) {
    console.log(`[DEBUG] Processing ${items.length} items`);
    
    // Method 1: Sequential
    for (const item of items) {
        console.log(`[DEBUG] Processing item ${item.id}`);
        await processItem(item);
    }
    
    // Method 2: Parallel
    await Promise.all(
        items.map((item, index) => {
            console.log(`[DEBUG] Starting item ${index}`);
            return processItem(item);
        })
    );
}
```

## Debug Documentation Template

```markdown
# Debug Report: [Issue Title]

## Summary
- **Issue ID:** #123
- **Severity:** High/Medium/Low
- **First Observed:** [Date]
- **Resolution Time:** [X hours]

## Problem Description
[Clear description of the issue]

## Impact
- Users affected: [Number/%]
- Features impacted: [List]
- Data loss: Yes/No

## Root Cause
[Technical explanation of why it happened]

## Evidence Trail
1. [Timestamp] - Initial error observed
2. [Timestamp] - Hypothesis 1 tested (failed)
3. [Timestamp] - Hypothesis 2 tested (confirmed)
4. [Timestamp] - Root cause identified

## Solution
[What was changed to fix it]

## Prevention
[How to prevent this in the future]

## Lessons Learned
[What we learned from this issue]

## Related Issues
- [Links to similar or related issues]
```
```

### reference/end-day-protocol.md

```markdown
# End Day Protocol

Comprehensive end-of-day procedures including security sweeps, context preservation, and cost tracking.

## Security Sweep (Mandatory)

### Automated Security Scan
```bash
#!/bin/bash
# end-day-security-sweep.sh

echo "=== End Day Security Sweep ==="
echo "Time: $(date -Iseconds)"

# Initialize results
SECURITY_PASSED=true
ISSUES=()

# 1. Secrets Detection
echo -e "\n[1/5] Scanning for secrets..."
gitleaks detect --source . --verbose --report-path=.security/gitleaks-$(date +%Y%m%d).json

if [ $? -ne 0 ]; then
    SECURITY_PASSED=false
    ISSUES+=("Secrets detected in code")
    
    # Show details
    jq '.Issues[] | {file: .File, secret: .Secret[0:20], line: .StartLine}' .security/gitleaks-*.json | tail -5
fi

# 2. Git History Check
echo -e "\n[2/5] Checking git history..."
SECRETS_IN_HISTORY=$(git log -p -10 | grep -E "(password|secret|api[_-]?key|token)[ ]*=[ ]*['\"]" | wc -l)

if [ "$SECRETS_IN_HISTORY" -gt 0 ]; then
    SECURITY_PASSED=false
    ISSUES+=("$SECRETS_IN_HISTORY potential secrets in recent commits")
fi

# 3. Dependency Vulnerabilities
echo -e "\n[3/5] Checking dependencies..."
if [ -f "package.json" ]; then
    npm audit --audit-level=critical --json > .security/npm-audit-$(date +%Y%m%d).json
    CRITICAL=$(jq '.metadata.vulnerabilities.critical' .security/npm-audit-*.json)
    HIGH=$(jq '.metadata.vulnerabilities.high' .security/npm-audit-*.json)
    
    if [ "$CRITICAL" -gt 0 ]; then
        SECURITY_PASSED=false
        ISSUES+=("$CRITICAL critical npm vulnerabilities")
    fi
elif [ -f "requirements.txt" ] || [ -f "pyproject.toml" ]; then
    pip-audit --format json > .security/pip-audit-$(date +%Y%m%d).json
    VULNS=$(jq '.vulnerabilities | length' .security/pip-audit-*.json)
    
    if [ "$VULNS" -gt 0 ]; then
        SECURITY_PASSED=false
        ISSUES+=("$VULNS Python dependency vulnerabilities")
    fi
fi

# 4. Environment Files
echo -e "\n[4/5] Checking environment files..."
ENV_ISSUES=$(grep -r "API_KEY\|SECRET\|PASSWORD\|TOKEN" --include="*.env*" . 2>/dev/null | grep -v ".env.example" | wc -l)

if [ "$ENV_ISSUES" -gt 0 ]; then
    echo "⚠️  WARNING: $ENV_ISSUES potential secrets in .env files"
    echo "   Ensure these are in .gitignore!"
    
    # Check if they're ignored
    for env_file in $(find . -name "*.env*" -not -name "*.example"); do
        if ! git check-ignore "$env_file" > /dev/null 2>&1; then
            SECURITY_PASSED=false
            ISSUES+=("$env_file not in .gitignore")
        fi
    done
fi

# 5. File Permissions
echo -e "\n[5/5] Checking file permissions..."
WORLD_WRITABLE=$(find . -type f -perm -002 -not -path "./.git/*" 2>/dev/null | wc -l)

if [ "$WORLD_WRITABLE" -gt 0 ]; then
    ISSUES+=("$WORLD_WRITABLE world-writable files detected")
fi

# Report
echo -e "\n=== Security Sweep Results ==="
if [ "$SECURITY_PASSED" = true ]; then
    echo "✅ ALL SECURITY CHECKS PASSED"
    echo "Safe to commit changes."
else
    echo "❌ SECURITY ISSUES DETECTED"
    echo ""
    printf '%s\n' "${ISSUES[@]}"
    echo ""
    echo "⛔ BLOCKING COMMITS until resolved"
    exit 1
fi
```

### Manual Security Checklist
```markdown
## Manual Security Review

Before ending the day, verify:

- [ ] No hardcoded credentials in new code
- [ ] API keys are from environment variables
- [ ] Database connection strings are secured
- [ ] No sensitive data in logs
- [ ] Authentication checks on new endpoints
- [ ] Input validation on user inputs
- [ ] SQL injection prevention (parameterized queries)
- [ ] XSS prevention (output encoding)
- [ ] CORS settings appropriate
- [ ] File upload restrictions in place
```

## Context Preservation

### PROJECT_CONTEXT.md Generation
```python
#!/usr/bin/env python3
# generate-context.py

import json
import subprocess
from datetime import datetime
from pathlib import Path

def generate_project_context():
    """Generate comprehensive project context."""
    
    context = {
        'last_updated': datetime.now().isoformat(),
        'session_duration': get_session_duration(),
        'completed_tasks': get_completed_tasks(),
        'in_progress': get_in_progress_tasks(),
        'blockers': get_blockers(),
        'decisions': get_decisions_made(),
        'tomorrow_priorities': analyze_next_priorities(),
        'metrics': gather_session_metrics()
    }
    
    # Generate markdown
    md_content = f"""## Project Context
Last Updated: {context['last_updated']}
Session Duration: {context['session_duration']}

### Completed This Session
{format_task_list(context['completed_tasks'])}

### In Progress
{format_progress_table(context['in_progress'])}

### Blockers
{format_blockers(context['blockers'])}

### Decisions Made
{format_decisions(context['decisions'])}

### Tomorrow's Priorities
{format_priorities(context['tomorrow_priorities'])}

### Session Metrics
- Commits: {context['metrics']['commits']}
- Files Changed: {context['metrics']['files_changed']}
- Tests Added: {context['metrics']['tests_added']}
- Coverage: {context['metrics']['coverage']}%
"""
    
    # Save context
    with open('PROJECT_CONTEXT.md', 'w') as f:
        f.write(md_content)
        
    # Also save JSON for programmatic access
    with open('.claude/context.json', 'w') as f:
        json.dump(context, f, indent=2)
        
def get_completed_tasks():
    """Extract completed tasks from various sources."""
    tasks = []
    
    # From git commits
    commits = subprocess.check_output(
        ['git', 'log', '--oneline', '--since="1 day ago"'],
        text=True
    ).strip().split('\n')
    
    for commit in commits:
        if commit:
            tasks.append({
                'source': 'git',
                'description': commit.split(' ', 1)[1],
                'completed': True
            })
    
    # From TODO file if exists
    if Path('TASK.md').exists():
        # Parse completed tasks
        pass
        
    # From todo system
    if Path('.claude/todos.json').exists():
        with open('.claude/todos.json') as f:
            todos = json.load(f)
            tasks.extend([
                {'source': 'todo', 'description': t['content'], 'completed': True}
                for t in todos if t.get('status') == 'completed'
            ])
    
    return tasks

def format_progress_table(tasks):
    """Format in-progress tasks as markdown table."""
    if not tasks:
        return "_No tasks currently in progress_"
        
    table = "| Task | Branch | Status | Blockers |\n"
    table += "|------|---------|---------|----------|\n"
    
    for task in tasks:
        table += f"| {task['name']} | {task['branch']} | {task['progress']}% | {task['blocker'] or 'None'} |\n"
        
    return table
```

### Smart Context Compression
```python
def compress_context(full_context, max_tokens=2000):
    """Compress context to fit token limits while preserving key info."""
    
    # Priority order for context elements
    priorities = [
        ('blockers', 1.0),          # Always include blockers
        ('in_progress', 0.9),       # Current work is critical
        ('decisions', 0.8),         # Important for continuity
        ('tomorrow_priorities', 0.7), # Next session planning
        ('completed_tasks', 0.5),   # Can be summarized
        ('metrics', 0.3),           # Nice to have
    ]
    
    compressed = {}
    current_tokens = 0
    
    for key, priority in priorities:
        if key not in full_context:
            continue
            
        content = full_context[key]
        tokens = estimate_tokens(content)
        
        if current_tokens + tokens <= max_tokens:
            compressed[key] = content
            current_tokens += tokens
        elif priority >= 0.7:  # High priority - try to summarize
            summary = summarize_content(content, max_tokens - current_tokens)
            compressed[key] = summary
            current_tokens += estimate_tokens(summary)
            
    return compressed
```

### Context Backup Strategy
```bash
#!/bin/bash
# backup-context.sh

BACKUP_DIR=~/.claude/context-backups
PROJECT_NAME=$(basename $(pwd))
DATE=$(date +%Y%m%d_%H%M%S)

# Create backup directory
mkdir -p "$BACKUP_DIR/$PROJECT_NAME"

# Backup all context files
tar -czf "$BACKUP_DIR/$PROJECT_NAME/context_$DATE.tar.gz" \
    PROJECT_CONTEXT.md \
    CLAUDE.md \
    TASK.md \
    PLANNING.md \
    .claude/context.json \
    .claude/todos.json 2>/dev/null

# Keep only last 7 days of backups
find "$BACKUP_DIR/$PROJECT_NAME" -name "context_*.tar.gz" -mtime +7 -delete

echo "Context backed up to: $BACKUP_DIR/$PROJECT_NAME/context_$DATE.tar.gz"
```

## Cost Tracking

### Daily Cost Aggregation
```python
#!/usr/bin/env python3
# track-daily-costs.py

import json
from datetime import datetime
from pathlib import Path
from collections import defaultdict

class CostTracker:
    """Track and aggregate daily costs."""
    
    def __init__(self):
        self.costs_dir = Path('costs')
        self.costs_dir.mkdir(exist_ok=True)
        
        self.today = datetime.now().strftime('%Y-%m-%d')
        self.daily_file = self.costs_dir / f'daily-{self.today}.json'
        self.mtd_file = self.costs_dir / 'mtd.json'
        self.feature_log = self.costs_dir / 'by-feature.jsonl'
        
    def track_session_costs(self, session_data):
        """Track costs for current session."""
        
        # Calculate costs from session data
        costs = self.calculate_costs(session_data)
        
        # Update daily file
        daily_costs = self.load_daily_costs()
        for category, amount in costs.items():
            daily_costs[category] = daily_costs.get(category, 0) + amount
            
        daily_costs['last_updated'] = datetime.now().isoformat()
        daily_costs['sessions'] = daily_costs.get('sessions', 0) + 1
        
        # Save daily costs
        with open(self.daily_file, 'w') as f:
            json.dump(daily_costs, f, indent=2)
            
        # Update MTD
        self.update_mtd(costs)
        
        # Log feature costs if applicable
        if session_data.get('feature_name'):
            self.log_feature_cost(session_data['feature_name'], costs)
            
        return daily_costs
        
    def calculate_costs(self, session_data):
        """Calculate costs from session metrics."""
        
        costs = defaultdict(float)
        
        # LLM costs
        llm_usage = session_data.get('llm_usage', {})
        for model, tokens in llm_usage.items():
            rate = self.get_model_rate(model)
            costs['llm'] += (tokens / 1000) * rate
            costs[f'llm_{model}'] = (tokens / 1000) * rate
            
        # Compute costs
        compute_hours = session_data.get('compute_hours', 0)
        if compute_hours > 0:
            costs['compute'] = compute_hours * 0.50  # RunPod estimate
            
        # Storage costs (monthly, so prorate)
        storage_gb = session_data.get('storage_gb', 0)
        if storage_gb > 0:
            costs['storage'] = (storage_gb * 0.02) / 30  # Daily portion
            
        # API costs
        api_calls = session_data.get('api_calls', {})
        for api, calls in api_calls.items():
            rate = self.get_api_rate(api)
            costs['api'] += calls * rate
            
        costs['total'] = sum(costs.values())
        
        return dict(costs)
        
    def get_model_rate(self, model):
        """Get cost per 1K tokens for model."""
        rates = {
            'claude-sonnet': 0.003,
            'claude-opus': 0.015,
            'claude-haiku': 0.00025,
            'deepseek-v3': 0.00014,
            'qwen-72b': 0.0002,
            'gpt-4': 0.03,
            'gpt-3.5': 0.001,
        }
        return rates.get(model, 0.001)  # Default rate
        
    def update_mtd(self, session_costs):
        """Update month-to-date totals."""
        
        # Load existing MTD
        if self.mtd_file.exists():
            with open(self.mtd_file) as f:
                mtd = json.load(f)
        else:
            mtd = {
                'month': datetime.now().strftime('%Y-%m'),
                'budget': 100.0,
                'categories': {}
            }
            
        # Update totals
        for category, amount in session_costs.items():
            if category != 'total':
                mtd['categories'][category] = mtd['categories'].get(category, 0) + amount
                
        mtd['total'] = sum(mtd['categories'].values())
        mtd['remaining'] = mtd['budget'] - mtd['total']
        mtd['percent_used'] = (mtd['total'] / mtd['budget']) * 100
        mtd['last_updated'] = datetime.now().isoformat()
        
        # Save MTD
        with open(self.mtd_file, 'w') as f:
            json.dump(mtd, f, indent=2)
            
        # Alert if approaching budget
        if mtd['percent_used'] > 80:
            print(f"⚠️  BUDGET ALERT: {mtd['percent_used']:.1f}% of monthly budget used!")
        if mtd['remaining'] < 10:
            print(f"🚨 CRITICAL: Only ${mtd['remaining']:.2f} remaining in budget!")
            
        return mtd
```

### Cost Analysis & Reporting
```python
def generate_cost_report():
    """Generate detailed cost analysis report."""
    
    report = f"""# Cost Analysis Report
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}

## Today's Costs
{generate_daily_summary()}

## Month-to-Date
{generate_mtd_summary()}

## Cost by Category
{generate_category_breakdown()}

## Cost by Feature
{generate_feature_costs()}

## Optimization Opportunities
{analyze_cost_optimization()}

## Projections
{generate_cost_projections()}
"""
    
    with open('costs/cost-report.md', 'w') as f:
        f.write(report)
        
def analyze_cost_optimization():
    """Identify cost optimization opportunities."""
    
    opportunities = []
    
    # Analyze LLM usage
    llm_costs = analyze_llm_usage()
    if llm_costs['claude_percent'] > 70:
        opportunities.append({
            'category': 'LLM Usage',
            'finding': f"Claude usage is {llm_costs['claude_percent']}% of LLM costs",
            'recommendation': "Consider DeepSeek V3 for bulk processing (95% cheaper)",
            'potential_savings': llm_costs['potential_savings']
        })
        
    # Analyze compute patterns
    compute_analysis = analyze_compute_usage()
    if compute_analysis['idle_percent'] > 20:
        opportunities.append({
            'category': 'Compute',
            'finding': f"{compute_analysis['idle_percent']}% of compute time is idle",
            'recommendation': "Implement auto-shutdown for idle resources",
            'potential_savings': compute_analysis['idle_cost']
        })
        
    return format_opportunities(opportunities)
```

## Worktree Management

### Cleanup Procedures
```bash
#!/bin/bash
# worktree-cleanup.sh

echo "=== Worktree Cleanup ==="

PROJECT=$(basename $(pwd))
REGISTRY=~/.claude/worktree-registry.json

# List current worktrees
echo "Current worktrees:"
git worktree list

# Check each worktree
git worktree list --porcelain | while read -r line; do
    if [[ $line == worktree* ]]; then
        WORKTREE_PATH=$(echo $line | cut -d' ' -f2)
        BRANCH=$(git -C "$WORKTREE_PATH" branch --show-current 2>/dev/null)
        
        if [ -z "$BRANCH" ]; then
            echo "⚠️  Detached worktree: $WORKTREE_PATH"
            continue
        fi
        
        # Check if branch is merged
        MERGED=$(git branch --merged main | grep "$BRANCH" | wc -l)
        
        if [ "$MERGED" -gt 0 ]; then
            echo "✅ Branch $BRANCH is merged and can be cleaned"
            
            # Remove worktree
            git worktree remove "$WORKTREE_PATH"
            
            # Update registry
            jq --arg path "$WORKTREE_PATH" \
               'del(.worktrees[] | select(.path == $path))' \
               "$REGISTRY" > "$REGISTRY.tmp" && mv "$REGISTRY.tmp" "$REGISTRY"
               
            echo "   Removed worktree: $WORKTREE_PATH"
        else
            echo "⏳ Branch $BRANCH is not merged yet"
            
            # Check age
            AGE_DAYS=$(git -C "$WORKTREE_PATH" log -1 --format=%cr | grep -o '[0-9]\+' | head -1)
            if [ "$AGE_DAYS" -gt 7 ]; then
                echo "   ⚠️  Worktree is $AGE_DAYS days old - consider reviewing"
            fi
        fi
    fi
done

# Clean up registry
echo -e "\nCleaning registry..."
jq '.worktrees |= map(select(.project == "'$PROJECT'"))' "$REGISTRY" > "$REGISTRY.tmp" && mv "$REGISTRY.tmp" "$REGISTRY"

# Port cleanup
echo -e "\nReleasing ports..."
USED_PORTS=$(jq -r '.worktrees[].ports[]' "$REGISTRY" 2>/dev/null | sort -n | uniq)
echo "Ports still in use: ${USED_PORTS:-none}"
```

### Stale Worktree Detection
```python
def detect_stale_worktrees(threshold_days=7):
    """Identify stale worktrees that need attention."""
    
    stale = []
    
    worktrees = get_worktree_info()
    
    for wt in worktrees:
        # Check last commit date
        last_commit = get_last_commit_date(wt['path'])
        age_days = (datetime.now() - last_commit).days
        
        if age_days > threshold_days:
            # Additional checks
            has_uncommitted = check_uncommitted_changes(wt['path'])
            is_behind = check_if_behind_main(wt['branch'])
            has_unpushed = check_unpushed_commits(wt['branch'])
            
            stale.append({
                'path': wt['path'],
                'branch': wt['branch'],
                'age_days': age_days,
                'uncommitted': has_uncommitted,
                'behind_main': is_behind,
                'unpushed': has_unpushed,
                'recommendation': recommend_action(wt, age_days)
            })
            
    return stale

def recommend_action(worktree, age_days):
    """Recommend action for stale worktree."""
    
    if worktree['uncommitted']:
        return "Commit or stash changes"
    elif worktree['unpushed']:
        return "Push commits to remote"
    elif age_days > 14:
        return "Consider archiving or removing"
    elif worktree['behind_main']:
        return "Rebase on latest main"
    else:
        return "Review and merge if ready"
```

## Learning Capture

### Session Learnings Extraction
```python
def capture_session_learnings():
    """Extract and document learnings from session."""
    
    learnings = {
        'patterns': extract_new_patterns(),
        'mistakes': extract_mistakes_made(),
        'tools': extract_useful_tools(),
        'optimizations': extract_optimizations(),
        'decisions': extract_architectural_decisions()
    }
    
    # Update CLAUDE.md with learnings
    update_claude_md(learnings)
    
    # Update skill-specific knowledge
    update_skill_knowledge(learnings)
    
    # Create learning entry
    learning_entry = {
        'date': datetime.now().isoformat(),
        'project': get_project_name(),
        'session_duration': get_session_duration(),
        'key_learnings': format_key_learnings(learnings),
        'applicable_to': identify_applicable_contexts(learnings)
    }
    
    # Append to learning log
    with open('~/.claude/learnings.jsonl', 'a') as f:
        f.write(json.dumps(learning_entry) + '\n')
        
def extract_new_patterns():
    """Identify new coding patterns discovered."""
    
    patterns = []
    
    # From git diff
    diff_analysis = analyze_git_diff()
    for pattern in diff_analysis['new_patterns']:
        patterns.append({
            'type': pattern['type'],
            'description': pattern['description'],
            'example': pattern['code_snippet'],
            'context': pattern['use_case']
        })
        
    return patterns
```

### CLAUDE.md Updates
```python
def update_claude_md(learnings):
    """Update project CLAUDE.md with new learnings."""
    
    # Read existing CLAUDE.md
    claude_md = Path('CLAUDE.md')
    if not claude_md.exists():
        content = "# Project Learnings\n\n"
    else:
        content = claude_md.read_text()
        
    # Add new learnings section
    new_section = f"""
## Session Learnings - {datetime.now().strftime('%Y-%m-%d')}

### New Patterns Discovered
{format_patterns(learnings['patterns'])}

### Mistakes to Avoid
{format_mistakes(learnings['mistakes'])}

### Useful Tools/Commands
{format_tools(learnings['tools'])}

### Performance Optimizations
{format_optimizations(learnings['optimizations'])}

### Architectural Decisions
{format_decisions(learnings['decisions'])}
"""
    
    # Append to file
    content += new_section
    claude_md.write_text(content)
```

## Integration & Automation

### Git Hooks Integration
```bash
#!/bin/bash
# .git/hooks/pre-commit

# Run end-day security sweep
./scripts/end-day-security-sweep.sh

if [ $? -ne 0 ]; then
    echo "❌ Security checks failed. Commit blocked."
    echo "Run './scripts/end-day-security-sweep.sh' for details"
    exit 1
fi

# Generate context if needed
if [ ! -f "PROJECT_CONTEXT.md" ] || [ $(find "PROJECT_CONTEXT.md" -mmin +60) ]; then
    echo "Generating project context..."
    python3 scripts/generate-context.py
    git add PROJECT_CONTEXT.md
fi

echo "✅ Pre-commit checks passed"
```

### Automated End Day Script
```bash
#!/bin/bash
# end-day.sh - Main end day automation

echo "=== Running End Day Protocol ==="
echo "Started: $(date)"

# 1. Security sweep
echo -e "\n[1/6] Security Sweep"
./scripts/end-day-security-sweep.sh || exit 1

# 2. Generate context
echo -e "\n[2/6] Generating Context"
python3 scripts/generate-context.py

# 3. Track costs
echo -e "\n[3/6] Tracking Costs"
python3 scripts/track-daily-costs.py

# 4. Clean worktrees
echo -e "\n[4/6] Cleaning Worktrees"
./scripts/worktree-cleanup.sh

# 5. Capture learnings
echo -e "\n[5/6] Capturing Learnings"
python3 scripts/capture-learnings.py

# 6. Backup context
echo -e "\n[6/6] Backing Up Context"
./scripts/backup-context.sh

# Generate summary
cat << EOF

=== End Day Summary ===
Time: $(date)
Duration: $(get_session_duration)

Security: ✅ Passed
Context: ✅ Saved
Costs: ✅ Tracked (Today: \$$(get_today_cost))
Worktrees: ✅ Cleaned
Learnings: ✅ Captured
Backup: ✅ Complete

Ready to commit and end session.
EOF
```

### Slack/Discord Notification
```python
def send_end_day_summary(webhook_url):
    """Send end day summary to team channel."""
    
    summary = generate_end_day_summary()
    
    payload = {
        'text': f"End Day Summary - {get_project_name()}",
        'attachments': [{
            'color': 'good' if summary['all_passed'] else 'danger',
            'fields': [
                {
                    'title': 'Security',
                    'value': '✅ Passed' if summary['security_passed'] else '❌ Issues found',
                    'short': True
                },
                {
                    'title': 'Tests',
                    'value': f"{summary['tests_passing']}/{summary['tests_total']} passing",
                    'short': True
                },
                {
                    'title': 'Coverage',
                    'value': f"{summary['coverage']}%",
                    'short': True
                },
                {
                    'title': 'Cost Today',
                    'value': f"${summary['cost_today']:.2f}",
                    'short': True
                },
                {
                    'title': 'Completed Tasks',
                    'value': '\n'.join(f"• {task}" for task in summary['completed_tasks']),
                    'short': False
                }
            ],
            'footer': f"Session duration: {summary['duration']}"
        }]
    }
    
    requests.post(webhook_url, json=payload)
```
```

### reference/cost-tracking.md

```markdown
# Cost Tracking & Optimization

Comprehensive cost tracking system with model routing, budget management, and optimization strategies.

## Cost Model Reference

### LLM Pricing (per 1K tokens)
```python
LLM_COSTS = {
    # Anthropic Claude
    'claude-3-opus': {'input': 0.015, 'output': 0.075},
    'claude-3.5-sonnet': {'input': 0.003, 'output': 0.015},
    'claude-3-haiku': {'input': 0.00025, 'output': 0.00125},
    
    # DeepSeek (95% cheaper than Claude)
    'deepseek-chat': {'input': 0.00014, 'output': 0.00028},
    'deepseek-coder': {'input': 0.00014, 'output': 0.00028},
    'deepseek-v3': {'input': 0.00014, 'output': 0.00028},
    
    # Groq (ultra-fast inference)
    'llama-3.1-70b': {'input': 0.00059, 'output': 0.00079},
    'llama-3.1-8b': {'input': 0.00005, 'output': 0.00008},
    'mixtral-8x7b': {'input': 0.00024, 'output': 0.00024},
    
    # OpenAI
    'gpt-4-turbo': {'input': 0.01, 'output': 0.03},
    'gpt-3.5-turbo': {'input': 0.0005, 'output': 0.0015},
    
    # Local (free)
    'ollama-llama3': {'input': 0.0, 'output': 0.0},
    'ollama-codellama': {'input': 0.0, 'output': 0.0},
    'ollama-mistral': {'input': 0.0, 'output': 0.0},
}

# Embeddings (per 1K tokens)
EMBEDDING_COSTS = {
    'voyage-3': 0.00002,
    'voyage-3-lite': 0.00002,
    'jina-embeddings-v3': 0.00002,
    'nomic-embed-text': 0.0,  # Local
    'text-embedding-3-small': 0.00002,
}

# Infrastructure
INFRA_COSTS = {
    'runpod': {
        'a100-80gb': 2.29,  # per hour
        'a100-40gb': 1.89,
        'a6000': 0.79,
        'rtx-4090': 0.44,
        'rtx-3090': 0.24,
    },
    'supabase': {
        'database': 0.0,     # Free tier: 500MB
        'storage': 0.02,     # per GB after 1GB free
        'bandwidth': 0.09,   # per GB after 2GB free
        'functions': 0.0,    # Free tier: 500K invocations
    },
    'vercel': {
        'bandwidth': 0.15,   # per GB after 100GB
        'functions': 0.0,    # Free tier: 100K GB-hours
        'storage': 0.0,      # Included
    },
}
```

## Intelligent Model Routing

### Task-Based Routing Logic
```python
class ModelRouter:
    """Intelligently route requests to optimal models based on task."""
    
    def __init__(self, budget_remaining=100, urgency='normal'):
        self.budget = budget_remaining
        self.urgency = urgency
        self.usage_history = []
        
    def select_model(self, task_type, complexity='medium', quality_required='high'):
        """Select optimal model for task."""
        
        # Emergency budget mode
        if self.budget < 5:
            return self.emergency_mode_selection(task_type)
            
        # Task-specific routing
        routes = {
            'complex_reasoning': self._route_reasoning,
            'code_generation': self._route_code_gen,
            'bulk_processing': self._route_bulk,
            'chat_conversation': self._route_chat,
            'summarization': self._route_summary,
            'embeddings': self._route_embeddings,
            'data_extraction': self._route_extraction,
        }
        
        router = routes.get(task_type, self._route_default)
        return router(complexity, quality_required)
        
    def _route_reasoning(self, complexity, quality):
        """Route complex reasoning tasks."""
        if quality == 'critical' or complexity == 'high':
            return 'claude-3.5-sonnet'  # Best reasoning
        elif self.budget > 20:
            return 'claude-3-haiku'     # Good balance
        else:
            return 'deepseek-v3'        # 95% cheaper
            
    def _route_code_gen(self, complexity, quality):
        """Route code generation tasks."""
        if complexity == 'high' or quality == 'critical':
            return 'claude-3.5-sonnet'  # Most accurate
        elif complexity == 'medium':
            return 'deepseek-coder'     # Specialized, cheap
        else:
            return 'ollama-codellama'   # Free for simple
            
    def _route_bulk(self, complexity, quality):
        """Route bulk processing tasks."""
        # Always use cheapest for bulk
        if self.urgency == 'high':
            return 'groq-llama-3.1-8b' # Ultra fast
        else:
            return 'deepseek-v3'        # Cheapest cloud
            
    def emergency_mode_selection(self, task_type):
        """Emergency mode when budget is critical."""
        print("⚠️ BUDGET CRITICAL: Using free/cheapest models only")
        
        emergency_models = {
            'complex_reasoning': 'ollama-llama3',
            'code_generation': 'ollama-codellama',
            'bulk_processing': 'ollama-mistral',
            'chat_conversation': 'ollama-llama3',
            'summarization': 'ollama-mistral',
            'embeddings': 'nomic-embed-text',
        }
        
        return emergency_models.get(task_type, 'ollama-mistral')
```

### Quality vs Cost Matrix
```python
QUALITY_COST_MATRIX = {
    # (quality_required, budget_sensitivity) -> model
    ('critical', 'low'): 'claude-3-opus',
    ('critical', 'medium'): 'claude-3.5-sonnet',
    ('critical', 'high'): 'claude-3-haiku',
    
    ('high', 'low'): 'claude-3.5-sonnet',
    ('high', 'medium'): 'claude-3-haiku',
    ('high', 'high'): 'deepseek-v3',
    
    ('medium', 'low'): 'claude-3-haiku',
    ('medium', 'medium'): 'deepseek-v3',
    ('medium', 'high'): 'groq-mixtral-8x7b',
    
    ('low', 'low'): 'deepseek-v3',
    ('low', 'medium'): 'groq-llama-3.1-8b',
    ('low', 'high'): 'ollama-llama3',
}
```

## Cost Tracking Implementation

### Real-time Cost Monitor
```python
class CostMonitor:
    """Real-time cost tracking with alerts."""
    
    def __init__(self, daily_budget=5.0, monthly_budget=100.0):
        self.daily_budget = daily_budget
        self.monthly_budget = monthly_budget
        self.costs_today = 0
        self.costs_mtd = self.load_mtd_costs()
        self.last_alert = None
        
    def track_llm_call(self, model, input_tokens, output_tokens):
        """Track individual LLM call cost."""
        
        # Calculate cost
        model_costs = LLM_COSTS.get(model, {'input': 0.001, 'output': 0.001})
        cost = (
            (input_tokens / 1000) * model_costs['input'] +
            (output_tokens / 1000) * model_costs['output']
        )
        
        # Update totals
        self.costs_today += cost
        self.costs_mtd += cost
        
        # Log call
        self.log_api_call({
            'timestamp': datetime.now().isoformat(),
            'model': model,
            'input_tokens': input_tokens,
            'output_tokens': output_tokens,
            'cost': cost,
            'total_today': self.costs_today,
            'total_mtd': self.costs_mtd,
        })
        
        # Check alerts
        self.check_budget_alerts()
        
        return cost
        
    def check_budget_alerts(self):
        """Check and trigger budget alerts."""
        
        alerts = []
        
        # Daily budget alerts
        daily_percent = (self.costs_today / self.daily_budget) * 100
        if daily_percent > 80 and self.last_alert != 'daily_80':
            alerts.append({
                'level': 'warning',
                'message': f'Daily budget 80% used (${self.costs_today:.2f}/${self.daily_budget})',
                'action': 'Consider switching to cheaper models'
            })
            self.last_alert = 'daily_80'
            
        elif daily_percent > 100:
            alerts.append({
                'level': 'critical',
                'message': f'Daily budget EXCEEDED (${self.costs_today:.2f}/${self.daily_budget})',
                'action': 'Switch to emergency mode - free models only'
            })
            
        # Monthly budget alerts
        monthly_percent = (self.costs_mtd / self.monthly_budget) * 100
        days_in_month = 30
        current_day = datetime.now().day
        expected_percent = (current_day / days_in_month) * 100
        
        if monthly_percent > expected_percent * 1.5:
            alerts.append({
                'level': 'warning',
                'message': f'Burning budget too fast ({monthly_percent:.1f}% used on day {current_day})',
                'action': 'Review usage patterns and optimize'
            })
            
        # Trigger alerts
        for alert in alerts:
            self.trigger_alert(alert)
            
    def trigger_alert(self, alert):
        """Trigger budget alert."""
        if alert['level'] == 'critical':
            print(f"🚨 {alert['message']}")
            print(f"   ACTION: {alert['action']}")
        else:
            print(f"⚠️  {alert['message']}")
            print(f"   Suggestion: {alert['action']}")
```

### Cost Analytics Dashboard
```python
def generate_cost_dashboard():
    """Generate cost analytics dashboard."""
    
    # Load cost data
    costs = load_all_cost_data()
    
    dashboard = f"""# Cost Analytics Dashboard
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}

## Summary
- **Today**: ${costs['today']:.2f} ({costs['today_percent']:.1f}% of daily budget)
- **This Week**: ${costs['week']:.2f}
- **MTD**: ${costs['mtd']:.2f} ({costs['mtd_percent']:.1f}% of monthly budget)
- **Projected Month**: ${costs['projected']:.2f}

## Cost by Model
{generate_model_breakdown(costs['by_model'])}

## Cost by Task Type
{generate_task_breakdown(costs['by_task'])}

## Top Expensive Operations
{generate_expensive_operations(costs['operations'])}

## Optimization Opportunities
{generate_optimization_suggestions(costs)}

## Trends
{generate_cost_trends(costs['daily_history'])}
"""
    
    return dashboard

def generate_model_breakdown(model_costs):
    """Generate model cost breakdown."""
    
    total = sum(model_costs.values())
    
    table = "| Model | Cost | % of Total | Calls | Avg/Call |\n"
    table += "|-------|------|-----------|--------|----------|\n"
    
    for model, data in sorted(model_costs.items(), key=lambda x: x[1]['cost'], reverse=True):
        percent = (data['cost'] / total) * 100
        avg_cost = data['cost'] / max(data['calls'], 1)
        
        table += f"| {model} | ${data['cost']:.2f} | {percent:.1f}% | {data['calls']} | ${avg_cost:.4f} |\n"
        
    return table

def generate_optimization_suggestions(costs):
    """Generate specific optimization suggestions."""
    
    suggestions = []
    
    # Analyze model usage
    expensive_model_percent = costs['by_model'].get('claude-3.5-sonnet', {}).get('percent', 0)
    if expensive_model_percent > 50:
        savings = expensive_model_percent * 0.95 * costs['mtd'] / 100
        suggestions.append({
            'priority': 'HIGH',
            'suggestion': 'Reduce Claude Sonnet usage',
            'details': f'Currently {expensive_model_percent:.1f}% of costs',
            'action': 'Use DeepSeek V3 for bulk tasks',
            'savings': f'Up to ${savings:.2f}/month'
        })
        
    # Analyze task patterns
    bulk_tasks = costs['by_task'].get('bulk_processing', {})
    if bulk_tasks.get('avg_cost', 0) > 0.01:
        suggestions.append({
            'priority': 'MEDIUM',
            'suggestion': 'Optimize bulk processing',
            'details': 'High cost per bulk operation',
            'action': 'Batch operations and use cheaper models',
            'savings': 'Est. 70% reduction'
        })
        
    # Time-based analysis
    peak_hour_costs = analyze_peak_hours(costs)
    if peak_hour_costs['concentration'] > 0.5:
        suggestions.append({
            'priority': 'LOW',
            'suggestion': 'Spread processing load',
            'details': f"{peak_hour_costs['concentration']*100:.0f}% of costs in {peak_hour_costs['hours']} hours",
            'action': 'Use scheduled/batch processing',
            'savings': 'Better rate negotiation possible'
        })
        
    return format_suggestions(suggestions)
```

## Budget Management

### Budget Configuration
```json
{
  "budgets": {
    "daily": {
      "soft_limit": 5.00,
      "hard_limit": 7.50,
      "alert_threshold": 0.8
    },
    "weekly": {
      "soft_limit": 25.00,
      "hard_limit": 35.00,
      "alert_threshold": 0.8
    },
    "monthly": {
      "soft_limit": 100.00,
      "hard_limit": 150.00,
      "alert_threshold": 0.8
    }
  },
  "cost_controls": {
    "auto_switch_models": true,
    "block_on_limit": false,
    "emergency_mode_threshold": 0.95
  },
  "notifications": {
    "email": "[email protected]",
    "slack_webhook": "https://hooks.slack.com/...",
    "alert_frequency": "once_per_threshold"
  }
}
```

### Budget Enforcement
```python
class BudgetEnforcer:
    """Enforce budget limits and controls."""
    
    def __init__(self, config_path='costs/budget-config.json'):
        self.config = self.load_config(config_path)
        self.current_costs = self.load_current_costs()
        
    def check_request_allowed(self, estimated_cost, priority='normal'):
        """Check if request should be allowed based on budget."""
        
        # Always allow critical requests
        if priority == 'critical':
            return True, None
            
        # Check daily limit
        daily_total = self.current_costs['today'] + estimated_cost
        if daily_total > self.config['budgets']['daily']['hard_limit']:
            return False, "Daily hard limit exceeded"
            
        # Check monthly limit
        monthly_total = self.current_costs['mtd'] + estimated_cost
        if monthly_total > self.config['budgets']['monthly']['hard_limit']:
            return False, "Monthly hard limit exceeded"
            
        # Warning but allow
        if daily_total > self.config['budgets']['daily']['soft_limit']:
            return True, "Warning: Exceeding daily soft limit"
            
        return True, None
        
    def get_model_override(self, requested_model):
        """Get model override based on budget status."""
        
        if not self.config['cost_controls']['auto_switch_models']:
            return requested_model
            
        # Calculate budget usage
        daily_usage = self.current_costs['today'] / self.config['budgets']['daily']['soft_limit']
        monthly_usage = self.current_costs['mtd'] / self.config['budgets']['monthly']['soft_limit']
        
        # Emergency mode
        if max(daily_usage, monthly_usage) > self.config['cost_controls']['emergency_mode_threshold']:
            return self.get_emergency_model(requested_model)
            
        # Progressive degradation
        if max(daily_usage, monthly_usage) > 0.8:
            return self.get_cheaper_alternative(requested_model)
            
        return requested_model
```

## Cost Optimization Strategies

### 1. Caching Strategy
```python
class CostOptimizedCache:
    """Cache expensive operations to reduce costs."""
    
    def __init__(self, cache_dir='~/.claude/cost-cache'):
        self.cache_dir = Path(cache_dir).expanduser()
        self.cache_dir.mkdir(exist_ok=True)
        self.cache_stats = {'hits': 0, 'misses': 0, 'savings': 0}
        
    def get_or_compute(self, key, compute_func, model_cost, ttl=86400):
        """Get from cache or compute with cost tracking."""
        
        cache_file = self.cache_dir / f"{hashlib.md5(key.encode()).hexdigest()}.json"
        
        # Check cache
        if cache_file.exists():
            cache_data = json.loads(cache_file.read_text())
            if time.time() - cache_data['timestamp'] < ttl:
                self.cache_stats['hits'] += 1
                self.cache_stats['savings'] += model_cost
                return cache_data['result']
                
        # Compute and cache
        self.cache_stats['misses'] += 1
        result = compute_func()
        
        cache_data = {
            'key': key,
            'result': result,
            'timestamp': time.time(),
            'model_cost': model_cost
        }
        
        cache_file.write_text(json.dumps(cache_data))
        
        return result
        
    def report_savings(self):
        """Report cache effectiveness."""
        hit_rate = self.cache_stats['hits'] / max(
            self.cache_stats['hits'] + self.cache_stats['misses'], 1
        )
        
        return {
            'hit_rate': f"{hit_rate*100:.1f}%",
            'total_saves': f"${self.cache_stats['savings']:.2f}",
            'api_calls_saved': self.cache_stats['hits']
        }
```

### 2. Batch Processing
```python
def batch_optimize_requests(requests, max_batch_size=20):
    """Optimize multiple requests through batching."""
    
    # Group by model and similar prompts
    grouped = defaultdict(list)
    
    for req in requests:
        key = (req['model'], req['task_type'])
        grouped[key].append(req)
        
    optimized = []
    
    for (model, task_type), group in grouped.items():
        # Batch similar requests
        batches = [group[i:i+max_batch_size] for i in range(0, len(group), max_batch_size)]
        
        for batch in batches:
            if len(batch) > 1:
                # Combine into single request
                combined = combine_requests(batch)
                combined['cost_savings'] = calculate_batch_savings(batch, combined)
                optimized.append(combined)
            else:
                optimized.extend(batch)
                
    return optimized
```

### 3. Progressive Model Degradation
```python
class ProgressiveModelDegradation:
    """Gradually switch to cheaper models as budget depletes."""
    
    DEGRADATION_PATH = {
        'claude-3-opus': ['claude-3.5-sonnet', 'claude-3-haiku', 'deepseek-v3', 'ollama-llama3'],
        'claude-3.5-sonnet': ['claude-3-haiku', 'deepseek-v3', 'groq-mixtral', 'ollama-llama3'],
        'claude-3-haiku': ['deepseek-v3', 'groq-llama-70b', 'groq-llama-8b', 'ollama-llama3'],
        'gpt-4-turbo': ['gpt-3.5-turbo', 'deepseek-v3', 'groq-mixtral', 'ollama-mistral'],
    }
    
    def get_degraded_model(self, original_model, budget_percent_remaining):
        """Get appropriate model based on remaining budget."""
        
        if original_model not in self.DEGRADATION_PATH:
            return original_model
            
        path = self.DEGRADATION_PATH[original_model]
        
        if budget_percent_remaining > 50:
            return original_model
        elif budget_percent_remaining > 30:
            return path[0] if len(path) > 0 else original_model
        elif budget_percent_remaining > 15:
            return path[1] if len(path) > 1 else path[0]
        elif budget_percent_remaining > 5:
            return path[2] if len(path) > 2 else path[1]
        else:
            return path[3] if len(path) > 3 else path[2]
```

### 4. Smart Token Optimization
```python
class TokenOptimizer:
    """Optimize token usage to reduce costs."""
    
    def optimize_prompt(self, prompt, target_reduction=0.2):
        """Reduce prompt tokens while preserving meaning."""
        
        original_tokens = self.count_tokens(prompt)
        
        # Optimization strategies
        optimized = prompt
        
        # 1. Remove redundant whitespace
        optimized = ' '.join(optimized.split())
        
        # 2. Compress system messages
        optimized = self.compress_system_message(optimized)
        
        # 3. Use abbreviations for common terms
        optimized = self.apply_abbreviations(optimized)
        
        # 4. Remove unnecessary examples if over token limit
        if self.count_tokens(optimized) > original_tokens * (1 - target_reduction):
            optimized = self.remove_examples(optimized)
            
        # 5. Summarize context if still too long
        if self.count_tokens(optimized) > original_tokens * (1 - target_reduction):
            optimized = self.summarize_context(optimized)
            
        new_tokens = self.count_tokens(optimized)
        reduction = (original_tokens - new_tokens) / original_tokens
        
        return {
            'optimized_prompt': optimized,
            'original_tokens': original_tokens,
            'new_tokens': new_tokens,
            'reduction': f"{reduction*100:.1f}%",
            'cost_savings': self.calculate_savings(original_tokens - new_tokens)
        }
```

## Reporting & Analytics

### Daily Cost Report
```python
def generate_daily_report():
    """Generate comprehensive daily cost report."""
    
    report_date = datetime.now().strftime('%Y-%m-%d')
    
    report = f"""# Daily Cost Report - {report_date}

## Executive Summary
{generate_executive_summary()}

## Cost Breakdown by Hour
{generate_hourly_breakdown()}

## Model Usage Analysis
{generate_model_analysis()}

## Task Type Distribution
{generate_task_distribution()}

## Cost Anomalies
{generate_anomaly_detection()}

## Optimization Achievements
{generate_optimization_report()}

## Tomorrow's Budget Plan
{generate_budget_plan()}

## Recommendations
{generate_daily_recommendations()}
"""
    
    # Save report
    report_path = f"costs/reports/daily-{report_date}.md"
    Path(report_path).parent.mkdir(exist_ok=True)
    Path(report_path).write_text(report)
    
    # Send notifications if needed
    if should_send_notification():
        send_cost_report(report)
        
    return report
```

### ROI Analysis
```python
def calculate_feature_roi(feature_name):
    """Calculate ROI for specific features."""
    
    # Get feature costs
    feature_costs = get_feature_costs(feature_name)
    
    # Get feature metrics
    metrics = get_feature_metrics(feature_name)
    
    # Calculate ROI
    roi_analysis = {
        'feature': feature_name,
        'total_cost': feature_costs['total'],
        'development_cost': feature_costs['development'],
        'operational_cost': feature_costs['operational'],
        'users_impacted': metrics['users'],
        'revenue_impact': metrics.get('revenue_impact', 0),
        'cost_per_user': feature_costs['total'] / max(metrics['users'], 1),
        'roi_percentage': ((metrics.get('revenue_impact', 0) - feature_costs['total']) / feature_costs['total']) * 100
    }
    
    # Cost breakdown
    roi_analysis['cost_breakdown'] = {
        'llm_costs': feature_costs['llm'],
        'infrastructure': feature_costs['infrastructure'],
        'development_hours': feature_costs['dev_hours'],
        'testing_costs': feature_costs['testing']
    }
    
    # Recommendations
    if roi_analysis['cost_per_user'] > 0.10:
        roi_analysis['recommendation'] = "Consider optimization - high cost per user"
    elif roi_analysis['roi_percentage'] < 0:
        roi_analysis['recommendation'] = "Review feature value - negative ROI"
    else:
        roi_analysis['recommendation'] = "Healthy ROI - maintain current approach"
        
    return roi_analysis
```
```

### reference/rollback-recovery.md

```markdown
# Rollback & Recovery Procedures

Comprehensive guide for safely rolling back changes, recovering from failures, and maintaining system stability.

## When to Rollback

### Automatic Rollback Triggers
```python
ROLLBACK_TRIGGERS = {
    'test_failures': {
        'condition': lambda metrics: metrics['test_pass_rate'] < 0.95,
        'severity': 'HIGH',
        'action': 'immediate_rollback'
    },
    'security_issues': {
        'condition': lambda scan: scan['critical_vulns'] > 0,
        'severity': 'CRITICAL',
        'action': 'immediate_rollback'
    },
    'performance_degradation': {
        'condition': lambda perf: perf['response_time'] > perf['baseline'] * 1.5,
        'severity': 'HIGH',
        'action': 'gradual_rollback'
    },
    'error_rate_spike': {
        'condition': lambda errors: errors['rate'] > errors['baseline'] * 3,
        'severity': 'HIGH',
        'action': 'immediate_rollback'
    },
    'memory_leak': {
        'condition': lambda mem: mem['growth_rate'] > 0.1,  # 10% per hour
        'severity': 'MEDIUM',
        'action': 'scheduled_rollback'
    },
}
```

### Manual Rollback Decisions
```markdown
## Rollback Decision Matrix

| Symptom | Investigation | Rollback? | Alternative |
|---------|---------------|-----------|-------------|
| Tests failing after "fix" | Check if fix addressed root cause | Yes | Debug properly |
| New security warnings | Verify if real vulnerabilities | Yes | Patch immediately |
| Performance slower | Profile and measure impact | Maybe | Optimize first |
| Unexpected behavior | Compare with requirements | Maybe | Feature flag off |
| User complaints | Quantify impact and severity | Maybe | Hotfix if minor |
| Data corruption risk | Assess data integrity | Yes | Immediate action |
```

## Rollback Strategies

### 1. Git-Based Rollback

#### Simple Revert
```bash
#!/bin/bash
# simple-rollback.sh

# Find the last known good commit
echo "Recent commits:"
git log --oneline -10

read -p "Enter the commit hash to revert to: " COMMIT_HASH

# Create a revert commit
git revert $COMMIT_HASH --no-edit

# Verify the revert
echo "Changes reverted:"
git diff HEAD~1
```

#### Selective File Rollback
```bash
#!/bin/bash
# selective-rollback.sh

# Rollback specific files only
FILES_TO_ROLLBACK=(
    "src/api/auth.py"
    "src/models/user.py"
    "tests/test_auth.py"
)

# Find last known good state
GOOD_COMMIT=$(git log --format="%H %s" | grep -m1 "stable:" | cut -d' ' -f1)

# Rollback each file
for file in "${FILES_TO_ROLLBACK[@]}"; do
    echo "Rolling back $file to $GOOD_COMMIT"
    git checkout $GOOD_COMMIT -- $file
done

# Show what changed
git status
git diff --staged
```

#### Branch-Based Rollback
```bash
#!/bin/bash
# branch-rollback.sh

# Create a rollback branch
ROLLBACK_BRANCH="rollback/$(date +%Y%m%d_%H%M%S)"
git checkout -b $ROLLBACK_BRANCH

# Reset to known good state
LAST_GOOD_TAG=$(git tag -l "stable-*" | sort -V | tail -1)
git reset --hard $LAST_GOOD_TAG

# Cherry-pick safe commits if any
read -p "Any commits to keep? (comma-separated hashes): " KEEP_COMMITS
if [ ! -z "$KEEP_COMMITS" ]; then
    IFS=',' read -ra COMMITS <<< "$KEEP_COMMITS"
    for commit in "${COMMITS[@]}"; do
        git cherry-pick $commit || echo "Failed to cherry-pick $commit"
    done
fi
```

### 2. Database Rollback

#### Migration Rollback
```python
#!/usr/bin/env python3
# db-rollback.py

import subprocess
import json
from datetime import datetime

class DatabaseRollback:
    """Handle database migration rollbacks safely."""
    
    def __init__(self, connection_string):
        self.conn = connection_string
        self.backup_before_rollback = True
        
    def rollback_migration(self, target_version=None):
        """Rollback database migrations."""
        
        # 1. Create backup first
        if self.backup_before_rollback:
            backup_name = f"rollback_backup_{datetime.now():%Y%m%d_%H%M%S}"
            self.create_backup(backup_name)
            
        # 2. Check current version
        current = self.get_current_version()
        print(f"Current version: {current}")
        
        # 3. Determine target
        if not target_version:
            target_version = self.get_previous_version()
            
        print(f"Rolling back to: {target_version}")
        
        # 4. Execute rollback
        try:
            # Using Alembic
            subprocess.run([
                "alembic", "downgrade", target_version
            ], check=True)
            
            # OR using Django
            # subprocess.run([
            #     "python", "manage.py", "migrate", "app_name", target_version
            # ], check=True)
            
            print("Rollback successful")
            
        except subprocess.CalledProcessError as e:
            print(f"Rollback failed: {e}")
            self.restore_from_backup(backup_name)
            raise
            
    def create_backup(self, backup_name):
        """Create database backup before rollback."""
        print(f"Creating backup: {backup_name}")
        
        # PostgreSQL example
        subprocess.run([
            "pg_dump",
            self.conn,
            "-f", f"backups/{backup_name}.sql"
        ], check=True)
        
    def get_migration_history(self):
        """Get migration history."""
        # Implementation depends on migration tool
        pass
```

#### Data Rollback Strategy
```sql
-- Create audit table for data rollback
CREATE TABLE IF NOT EXISTS data_audit (
    id SERIAL PRIMARY KEY,
    table_name VARCHAR(255),
    operation VARCHAR(10),
    row_data JSONB,
    changed_by VARCHAR(255),
    changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Trigger to capture changes
CREATE OR REPLACE FUNCTION audit_trigger_function()
RETURNS TRIGGER AS $$
BEGIN
    INSERT INTO data_audit (table_name, operation, row_data, changed_by)
    VALUES (
        TG_TABLE_NAME,
        TG_OP,
        CASE
            WHEN TG_OP = 'DELETE' THEN row_to_json(OLD)
            ELSE row_to_json(NEW)
        END,
        current_user
    );
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Apply to critical tables
CREATE TRIGGER user_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION audit_trigger_function();
```

### 3. Application Rollback

#### Container-Based Rollback
```bash
#!/bin/bash
# container-rollback.sh

SERVICE_NAME=$1
ROLLBACK_VERSION=${2:-"previous"}

echo "Rolling back $SERVICE_NAME to $ROLLBACK_VERSION"

# Kubernetes rollback
if command -v kubectl &> /dev/null; then
    kubectl rollout history deployment/$SERVICE_NAME
    
    if [ "$ROLLBACK_VERSION" = "previous" ]; then
        kubectl rollout undo deployment/$SERVICE_NAME
    else
        kubectl rollout undo deployment/$SERVICE_NAME --to-revision=$ROLLBACK_VERSION
    fi
    
    kubectl rollout status deployment/$SERVICE_NAME
    
# Docker Swarm rollback
elif command -v docker &> /dev/null; then
    docker service rollback $SERVICE_NAME
    docker service ps $SERVICE_NAME
fi

# Verify rollback
./scripts/smoke-test.sh $SERVICE_NAME
```

#### Feature Flag Rollback
```python
class FeatureFlagRollback:
    """Rollback using feature flags."""
    
    def __init__(self, flag_service):
        self.flags = flag_service
        
    def emergency_disable(self, feature_name, reason):
        """Emergency disable a feature."""
        
        # 1. Disable feature
        self.flags.disable(feature_name)
        
        # 2. Clear caches
        self.clear_feature_caches(feature_name)
        
        # 3. Log the action
        self.log_rollback({
            'feature': feature_name,
            'action': 'emergency_disable',
            'reason': reason,
            'timestamp': datetime.now(),
            'disabled_by': get_current_user()
        })
        
        # 4. Notify team
        self.notify_team(
            f"Feature {feature_name} disabled: {reason}"
        )
        
        # 5. Create incident
        incident_id = self.create_incident({
            'title': f"Feature {feature_name} rolled back",
            'severity': 'P2',
            'reason': reason
        })
        
        return incident_id
        
    def gradual_rollback(self, feature_name, target_percentage=0):
        """Gradually reduce feature exposure."""
        
        current = self.flags.get_rollout_percentage(feature_name)
        
        while current > target_percentage:
            # Reduce by 10% every 5 minutes
            new_percentage = max(current - 10, target_percentage)
            
            self.flags.set_rollout_percentage(
                feature_name,
                new_percentage
            )
            
            # Monitor metrics
            metrics = self.monitor_metrics(5 * 60)  # 5 minutes
            
            if metrics['error_rate'] > metrics['baseline'] * 2:
                # Accelerate rollback
                self.flags.set_rollout_percentage(feature_name, 0)
                break
                
            current = new_percentage
```

### 4. Infrastructure Rollback

#### Terraform State Rollback
```bash
#!/bin/bash
# terraform-rollback.sh

# List state history
terraform state list

# Create backup
terraform state pull > terraform.tfstate.backup.$(date +%Y%m%d_%H%M%S)

# Rollback specific resources
RESOURCES_TO_ROLLBACK=(
    "aws_instance.web"
    "aws_db_instance.main"
    "aws_security_group.api"
)

for resource in "${RESOURCES_TO_ROLLBACK[@]}"; do
    echo "Rolling back $resource"
    
    # Import previous state
    terraform import $resource $PREVIOUS_RESOURCE_ID
    
    # Or remove and recreate
    # terraform state rm $resource
    # terraform apply -target=$resource
done
```

## Recovery Workflows

### 1. Incident Response Recovery
```python
class IncidentRecovery:
    """Coordinate incident recovery procedures."""
    
    def __init__(self):
        self.playbook = self.load_playbook()
        self.team = self.get_on_call_team()
        
    def initiate_recovery(self, incident_type, severity):
        """Start recovery procedure."""
        
        incident = {
            'id': generate_incident_id(),
            'type': incident_type,
            'severity': severity,
            'started': datetime.now(),
            'status': 'active',
            'commander': self.assign_incident_commander(),
        }
        
        # Execute playbook
        playbook_steps = self.playbook.get_steps(incident_type)
        
        for step in playbook_steps:
            result = self.execute_step(step, incident)
            
            if not result['success']:
                self.escalate(incident, step, result['error'])
                
        return incident
        
    def execute_step(self, step, incident):
        """Execute recovery step."""
        
        print(f"Executing: {step['name']}")
        
        try:
            if step['type'] == 'rollback':
                return self.perform_rollback(step['target'])
            elif step['type'] == 'restart':
                return self.restart_service(step['service'])
            elif step['type'] == 'scale':
                return self.scale_service(step['service'], step['replicas'])
            elif step['type'] == 'failover':
                return self.initiate_failover(step['from'], step['to'])
                
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'step': step['name']
            }
```

### 2. Data Recovery
```python
class DataRecovery:
    """Handle data recovery scenarios."""
    
    def __init__(self, backup_service):
        self.backup = backup_service
        
    def recover_from_corruption(self, table_name, corruption_time):
        """Recover from data corruption."""
        
        recovery_plan = {
            'table': table_name,
            'corruption_detected': datetime.now(),
            'corruption_occurred': corruption_time,
            'steps': []
        }
        
        # 1. Isolate corrupted data
        temp_table = f"{table_name}_corrupted_{datetime.now():%Y%m%d_%H%M%S}"
        recovery_plan['steps'].append({
            'action': 'isolate',
            'sql': f"CREATE TABLE {temp_table} AS SELECT * FROM {table_name}"
        })
        
        # 2. Find last good backup
        backup = self.backup.find_backup_before(corruption_time)
        recovery_plan['backup_used'] = backup['id']
        
        # 3. Restore from backup
        recovery_plan['steps'].append({
            'action': 'restore',
            'source': backup['path'],
            'target': table_name
        })
        
        # 4. Replay transactions after backup
        transactions = self.get_transactions_after(backup['timestamp'])
        
        for txn in transactions:
            if txn['timestamp'] < corruption_time:
                recovery_plan['steps'].append({
                    'action': 'replay_transaction',
                    'transaction': txn
                })
                
        # 5. Verify data integrity
        recovery_plan['steps'].append({
            'action': 'verify',
            'checks': [
                'row_count',
                'constraint_validation',
                'referential_integrity'
            ]
        })
        
        return recovery_plan
```

### 3. Service Recovery
```bash
#!/bin/bash
# service-recovery.sh

SERVICE=$1
MAX_ATTEMPTS=3
ATTEMPT=0

echo "=== Service Recovery: $SERVICE ==="

while [ $ATTEMPT -lt $MAX_ATTEMPTS ]; do
    ATTEMPT=$((ATTEMPT + 1))
    echo "Attempt $ATTEMPT of $MAX_ATTEMPTS"
    
    # 1. Check service status
    if systemctl is-active --quiet $SERVICE; then
        echo "Service is running"
        break
    fi
    
    # 2. Try to start service
    echo "Starting service..."
    systemctl start $SERVICE
    
    # 3. Wait for service to stabilize
    sleep 10
    
    # 4. Health check
    if curl -f http://localhost:8080/health > /dev/null 2>&1; then
        echo "Service is healthy"
        break
    else
        echo "Health check failed"
        
        # 5. Check logs
        echo "Recent errors:"
        journalctl -u $SERVICE -n 20 --no-pager | grep -i error
        
        # 6. Try recovery actions
        case $ATTEMPT in
            1)
                echo "Clearing cache..."
                rm -rf /var/cache/$SERVICE/*
                ;;
            2)
                echo "Resetting configuration..."
                cp /etc/$SERVICE/config.default /etc/$SERVICE/config
                ;;
            3)
                echo "Final attempt - full restart..."
                systemctl stop $SERVICE
                sleep 5
                systemctl start $SERVICE
                ;;
        esac
    fi
done

if [ $ATTEMPT -eq $MAX_ATTEMPTS ]; then
    echo "FAILED to recover service after $MAX_ATTEMPTS attempts"
    exit 1
else
    echo "Service recovered successfully"
fi
```

## Rollback Verification

### Automated Verification Suite
```python
class RollbackVerification:
    """Verify rollback success."""
    
    def __init__(self):
        self.checks = []
        
    def verify_rollback(self, rollback_id):
        """Run comprehensive rollback verification."""
        
        results = {
            'rollback_id': rollback_id,
            'timestamp': datetime.now(),
            'checks': {},
            'overall_status': 'pending'
        }
        
        # 1. Version verification
        results['checks']['version'] = self.verify_version()
        
        # 2. Functionality tests
        results['checks']['smoke_tests'] = self.run_smoke_tests()
        
        # 3. Data integrity
        results['checks']['data_integrity'] = self.verify_data_integrity()
        
        # 4. Performance baseline
        results['checks']['performance'] = self.verify_performance()
        
        # 5. Security scan
        results['checks']['security'] = self.run_security_scan()
        
        # 6. Integration tests
        results['checks']['integrations'] = self.test_integrations()
        
        # Determine overall status
        failed_checks = [
            check for check, result in results['checks'].items()
            if not result['passed']
        ]
        
        if not failed_checks:
            results['overall_status'] = 'success'
        elif len(failed_checks) <= 2:
            results['overall_status'] = 'partial_success'
        else:
            results['overall_status'] = 'failed'
            
        return results
        
    def run_smoke_tests(self):
        """Run critical path smoke tests."""
        
        tests = [
            ('user_login', self.test_user_login),
            ('api_health', self.test_api_health),
            ('database_connection', self.test_db_connection),
            ('critical_endpoints', self.test_critical_endpoints),
        ]
        
        results = []
        for test_name, test_func in tests:
            try:
                test_func()
                results.append({'test': test_name, 'status': 'passed'})
            except Exception as e:
                results.append({
                    'test': test_name,
                    'status': 'failed',
                    'error': str(e)
                })
                
        return {
            'passed': all(r['status'] == 'passed' for r in results),
            'details': results
        }
```

## Recovery Playbooks

### Database Corruption Playbook
```markdown
# Database Corruption Recovery Playbook

## Immediate Actions (0-5 minutes)
1. **ALERT TEAM** - Page on-call DBA
2. **ISOLATE** - Prevent further writes
   ```sql
   ALTER DATABASE mydb SET default_transaction_read_only = on;
   ```
3. **SNAPSHOT** - Create immediate backup
   ```bash
   pg_dump corrupted_db > emergency_backup_$(date +%s).sql
   ```

## Assessment (5-15 minutes)
1. Identify corruption scope
2. Determine last known good state
3. Estimate data loss window
4. Choose recovery strategy

## Recovery Strategies
### Option A: Point-in-Time Recovery
- Best when: Corruption time is known
- Data loss: Minimal
- Time: 30-60 minutes

### Option B: Backup Restore
- Best when: Recent backup available
- Data loss: Since last backup
- Time: 15-30 minutes

### Option C: Replica Promotion
- Best when: Healthy replica exists
- Data loss: Replication lag only
- Time: 5-10 minutes

## Validation
1. Run integrity checks
2. Verify row counts
3. Test application functionality
4. Monitor for recurring issues
```

### Service Outage Playbook
```python
SERVICE_PLAYBOOK = {
    'detection': {
        'alerts': ['pager_duty', 'slack', 'email'],
        'escalation_time': 5,  # minutes
    },
    'triage': {
        'steps': [
            'check_service_status',
            'review_recent_changes',
            'examine_logs',
            'check_dependencies',
            'assess_impact'
        ]
    },
    'mitigation': {
        'quick_wins': [
            'restart_service',
            'clear_cache',
            'increase_resources',
            'enable_fallback'
        ],
        'rollback_triggers': [
            'recent_deployment',
            'config_change',
            'dependency_update'
        ]
    },
    'recovery': {
        'verification': [
            'health_checks',
            'synthetic_monitoring',
            'real_user_monitoring',
            'error_rate_baseline'
        ]
    },
    'postmortem': {
        'timeline': 'within_48_hours',
        'participants': ['oncall', 'service_owner', 'sre'],
        'deliverables': ['root_cause', 'action_items', 'runbook_updates']
    }
}
```

## Prevention Strategies

### 1. Rollback Readiness
```yaml
# rollback-readiness.yaml
rollback_requirements:
  version_control:
    - semantic_versioning: true
    - tagged_releases: true
    - rollback_scripts: true
    
  database:
    - reversible_migrations: true
    - backup_frequency: hourly
    - backup_retention: 30_days
    
  application:
    - feature_flags: true
    - backward_compatibility: 2_versions
    - health_checks: comprehensive
    
  infrastructure:
    - immutable_deployments: true
    - blue_green_capable: true
    - state_management: versioned
```

### 2. Automated Rollback Testing
```python
def test_rollback_capability():
    """Test rollback procedures in staging."""
    
    # Deploy new version
    deploy_version("v2.0.0")
    
    # Verify deployment
    assert health_check() == "healthy"
    
    # Simulate rollback scenarios
    scenarios = [
        'immediate_rollback',
        'rollback_under_load',
        'partial_rollback',
        'data_migration_rollback'
    ]
    
    for scenario in scenarios:
        # Execute rollback
        rollback_result = execute_rollback_scenario(scenario)
        
        # Verify rollback
        assert rollback_result['success']
        assert version_check() == "v1.9.0"
        assert data_integrity_check() == "passed"
        
        # Restore to new version for next test
        deploy_version("v2.0.0")
```

### 3. Rollback Metrics
```python
ROLLBACK_METRICS = {
    'mttr': {  # Mean Time To Rollback
        'target': 300,  # 5 minutes
        'measurement': 'from_decision_to_stable'
    },
    'rollback_success_rate': {
        'target': 0.99,  # 99%
        'measurement': 'successful_rollbacks / total_rollbacks'
    },
    'automated_rollback_percentage': {
        'target': 0.80,  # 80%
        'measurement': 'automated_rollbacks / total_rollbacks'
    },
    'data_loss_incidents': {
        'target': 0,
        'measurement': 'count_per_quarter'
    }
}
```
```

### reference/agent-routing.md

```markdown
# Agent Routing Guide

Complete reference for 70+ specialized agents with intelligent routing logic and optimal selection strategies.

## Agent Categories Overview

### Core Development Agents
- **Debugging & Error Handling** (8 agents)
- **Code Review & Quality** (10 agents)
- **Testing & TDD** (6 agents)
- **Documentation** (7 agents)

### Specialized Development
- **Language Experts** (8 agents)
- **Frontend & Mobile** (2 agents)
- **Backend & Architecture** (6 agents)
- **Infrastructure & DevOps** (15 agents)

### Advanced Capabilities
- **AI & LLM Development** (2 agents)
- **Performance & Optimization** (6 agents)
- **Security & Compliance** (3 agents)
- **Refactoring & Modernization** (2 agents)

## Intelligent Agent Selection

### Selection Algorithm
```python
class AgentSelector:
    """Intelligent agent selection based on task analysis."""
    
    def __init__(self):
        self.agent_catalog = self.load_agent_catalog()
        self.usage_history = self.load_usage_history()
        self.performance_metrics = self.load_performance_metrics()
        
    def select_optimal_agent(self, task_description, context=None):
        """Select the most appropriate agent for a task."""
        
        # 1. Extract task features
        features = self.extract_task_features(task_description)
        
        # 2. Find candidate agents
        candidates = self.find_candidate_agents(features)
        
        # 3. Score candidates
        scored_candidates = []
        for agent in candidates:
            score = self.score_agent(agent, features, context)
            scored_candidates.append((agent, score))
            
        # 4. Select best agent
        best_agent = max(scored_candidates, key=lambda x: x[1])
        
        # 5. Log selection
        self.log_selection(task_description, best_agent[0], best_agent[1])
        
        return best_agent[0]
        
    def extract_task_features(self, description):
        """Extract features from task description."""
        features = {
            'keywords': self.extract_keywords(description),
            'task_type': self.classify_task_type(description),
            'complexity': self.estimate_complexity(description),
            'domain': self.identify_domain(description),
            'urgency': self.detect_urgency(description),
        }
        return features
        
    def score_agent(self, agent, features, context):
        """Score agent fitness for task."""
        score = 0
        
        # Keyword match (40 points)
        keyword_matches = len(
            set(features['keywords']) & set(agent['keywords'])
        )
        score += min(keyword_matches * 10, 40)
        
        # Task type match (30 points)
        if features['task_type'] in agent['specialties']:
            score += 30
        elif features['task_type'] in agent['capabilities']:
            score += 15
            
        # Past performance (20 points)
        performance = self.get_agent_performance(agent['id'])
        score += performance * 20
        
        # Context relevance (10 points)
        if context and self.is_context_relevant(agent, context):
            score += 10
            
        return score
```

### Task Type Classification
```python
TASK_PATTERNS = {
    'debugging': [
        'error', 'bug', 'fix', 'broken', 'failing', 'crash',
        'exception', 'stack trace', 'not working', 'issue'
    ],
    'code_review': [
        'review', 'check', 'audit', 'quality', 'feedback',
        'improvement', 'refactor', 'clean', 'best practice'
    ],
    'testing': [
        'test', 'unittest', 'integration', 'e2e', 'coverage',
        'pytest', 'jest', 'mock', 'assertion', 'tdd'
    ],
    'performance': [
        'slow', 'performance', 'optimize', 'speed', 'latency',
        'memory', 'cpu', 'bottleneck', 'profile', 'benchmark'
    ],
    'security': [
        'security', 'vulnerability', 'exploit', 'auth', 'encryption',
        'xss', 'sql injection', 'csrf', 'penetration', 'audit'
    ],
    'deployment': [
        'deploy', 'ci/cd', 'pipeline', 'release', 'production',
        'kubernetes', 'docker', 'terraform', 'aws', 'cloud'
    ],
    'architecture': [
        'architecture', 'design', 'pattern', 'structure', 'scalability',
        'microservices', 'monolith', 'api', 'schema', 'diagram'
    ],
}
```

## Complete Agent Catalog

### Debugging & Error Handling Agents

#### debugging-toolkit:debugger
**Specialties:** General debugging, test failures, runtime errors
**Best for:** Any error investigation, systematic debugging
**Keywords:** error, bug, debug, trace, investigate, fix
```bash
Task debugging-toolkit:debugger "Debug authentication error in login flow"
```

#### error-debugging:error-detective
**Specialties:** Log analysis, error pattern detection
**Best for:** Finding error patterns across distributed systems
**Keywords:** logs, patterns, correlation, search, analyze
```bash
Task error-debugging:error-detective "Search logs for connection timeout patterns"
```

#### distributed-debugging:devops-troubleshooter
**Specialties:** Production incidents, distributed systems
**Best for:** Complex multi-service debugging, incident response
**Keywords:** incident, production, distributed, microservices, outage
```bash
Task distributed-debugging:devops-troubleshooter "Investigate service mesh communication failure"
```

### Code Review & Quality Agents

#### code-documentation:code-reviewer
**Specialties:** Elite code review, security vulnerabilities
**Best for:** Critical code paths, security-sensitive changes
**Keywords:** review, security, vulnerability, quality, audit
```bash
Task code-documentation:code-reviewer "Review authentication middleware for vulnerabilities"
```

#### comprehensive-review:architect-review
**Specialties:** Architecture review, design patterns
**Best for:** System design reviews, major refactoring
**Keywords:** architecture, design, patterns, structure, scalability
```bash
Task comprehensive-review:architect-review "Review microservices communication architecture"
```

#### git-pr-workflows:code-reviewer
**Specialties:** Pull request reviews, git workflows
**Best for:** Standard PR reviews, merge conflict resolution
**Keywords:** pr, pull request, git, merge, branch
```bash
Task git-pr-workflows:code-reviewer "Review feature branch PR #123"
```

### Testing & TDD Agents

#### unit-testing:test-automator
**Specialties:** Test automation, coverage improvement
**Best for:** Writing comprehensive test suites
**Keywords:** test, unit, coverage, pytest, jest, mock
```bash
Task unit-testing:test-automator "Create test suite for payment processing module"
```

#### tdd-workflows:tdd-orchestrator
**Specialties:** Test-driven development workflows
**Best for:** Implementing features using TDD methodology
**Keywords:** tdd, red-green-refactor, test-first
```bash
Task tdd-workflows:tdd-orchestrator "Implement user registration with TDD approach"
```

#### performance-testing-review:test-automator
**Specialties:** Performance testing, load testing
**Best for:** Creating performance test suites
**Keywords:** performance, load, stress, benchmark, latency
```bash
Task performance-testing-review:test-automator "Create load tests for API endpoints"
```

### Documentation Agents

#### documentation-generation:docs-architect
**Specialties:** System documentation, architecture guides
**Best for:** Comprehensive technical documentation
**Keywords:** documentation, architecture, guide, manual, readme
```bash
Task documentation-generation:docs-architect "Create system architecture documentation"
```

#### documentation-generation:api-documenter
**Specialties:** API documentation, OpenAPI specs
**Best for:** REST/GraphQL API documentation
**Keywords:** api, openapi, swagger, rest, graphql, endpoints
```bash
Task documentation-generation:api-documenter "Generate OpenAPI spec for user service"
```

#### documentation-generation:tutorial-engineer
**Specialties:** Tutorials, onboarding guides
**Best for:** Step-by-step tutorials, user guides
**Keywords:** tutorial, guide, howto, onboarding, walkthrough
```bash
Task documentation-generation:tutorial-engineer "Create getting started tutorial"
```

### Language-Specific Experts

#### python-development:python-pro
**Specialties:** Modern Python, async, type hints
**Best for:** Python 3.12+ development, async patterns
**Keywords:** python, async, asyncio, type hints, dataclasses
```bash
Task python-development:python-pro "Refactor sync code to use async/await"
```

#### javascript-typescript:typescript-pro
**Specialties:** TypeScript, advanced types, generics
**Best for:** Complex TypeScript systems, type safety
**Keywords:** typescript, types, generics, interfaces, strict
```bash
Task javascript-typescript:typescript-pro "Add strict typing to React components"
```

#### python-development:fastapi-pro
**Specialties:** FastAPI, async APIs, microservices
**Best for:** High-performance Python APIs
**Keywords:** fastapi, api, async, pydantic, microservices
```bash
Task python-development:fastapi-pro "Build async REST API with FastAPI"
```

### Infrastructure & DevOps

#### deployment-strategies:deployment-engineer
**Specialties:** CI/CD pipelines, deployment automation
**Best for:** Setting up deployment pipelines
**Keywords:** deploy, ci/cd, pipeline, automation, release
```bash
Task deployment-strategies:deployment-engineer "Create GitHub Actions deployment pipeline"
```

#### cicd-automation:kubernetes-architect
**Specialties:** Kubernetes, GitOps, service mesh
**Best for:** K8s deployments, cluster management
**Keywords:** kubernetes, k8s, gitops, helm, cluster
```bash
Task cicd-automation:kubernetes-architect "Design K8s deployment for microservices"
```

#### deployment-strategies:terraform-specialist
**Specialties:** Infrastructure as Code, Terraform
**Best for:** Cloud infrastructure automation
**Keywords:** terraform, iac, infrastructure, aws, cloud
```bash
Task deployment-strategies:terraform-specialist "Create Terraform modules for AWS setup"
```

### Performance & Optimization

#### observability-monitoring:performance-engineer
**Specialties:** APM, performance optimization
**Best for:** Application performance issues
**Keywords:** performance, apm, monitoring, metrics, optimization
```bash
Task observability-monitoring:performance-engineer "Optimize API response times"
```

#### observability-monitoring:database-optimizer
**Specialties:** Database performance tuning
**Best for:** Query optimization, indexing strategies
**Keywords:** database, query, index, performance, sql
```bash
Task observability-monitoring:database-optimizer "Optimize slow database queries"
```

### Security & Compliance

#### full-stack-orchestration:security-auditor
**Specialties:** Security audits, compliance, DevSecOps
**Best for:** Comprehensive security reviews
**Keywords:** security, audit, compliance, vulnerability, pentest
```bash
Task full-stack-orchestration:security-auditor "Perform security audit on API endpoints"
```

#### data-validation-suite:backend-security-coder
**Specialties:** Secure coding, input validation
**Best for:** Implementing security features
**Keywords:** validation, sanitization, security, auth, encryption
```bash
Task data-validation-suite:backend-security-coder "Implement secure user input handling"
```

### AI & LLM Development

#### llm-application-dev:ai-engineer
**Specialties:** LLM applications, RAG systems
**Best for:** Building AI-powered features
**Keywords:** llm, ai, rag, embeddings, vector, gpt
```bash
Task llm-application-dev:ai-engineer "Build RAG system for documentation search"
```

#### llm-application-dev:prompt-engineer
**Specialties:** Prompt optimization, LLM tuning
**Best for:** Improving LLM performance
**Keywords:** prompt, optimization, llm, tuning, few-shot
```bash
Task llm-application-dev:prompt-engineer "Optimize prompts for code generation"
```

## Agent Routing Patterns

### Pattern 1: Cascading Expertise
```python
def cascade_agents(task):
    """Use multiple agents in sequence for complex tasks."""
    
    # Start with general agent
    initial_analysis = Task(
        "error-debugging:debugger",
        f"Initial analysis: {task}"
    )
    
    # Route to specialist based on findings
    if "performance" in initial_analysis:
        return Task(
            "observability-monitoring:performance-engineer",
            f"Deep dive: {task}"
        )
    elif "security" in initial_analysis:
        return Task(
            "full-stack-orchestration:security-auditor",
            f"Security analysis: {task}"
        )
    # ... continue routing
```

### Pattern 2: Parallel Expertise
```python
def parallel_review(code_change):
    """Use multiple agents in parallel for comprehensive review."""
    
    reviews = []
    
    # Security review
    reviews.append(Task(
        "code-documentation:code-reviewer",
        f"Security review: {code_change}",
        async=True
    ))
    
    # Performance review
    reviews.append(Task(
        "performance-testing-review:performance-engineer",
        f"Performance impact: {code_change}",
        async=True
    ))
    
    # Architecture review
    reviews.append(Task(
        "comprehensive-review:architect-review",
        f"Architecture compliance: {code_change}",
        async=True
    ))
    
    return await_all(reviews)
```

### Pattern 3: Domain-Specific Routing
```python
DOMAIN_AGENTS = {
    'auth': 'data-validation-suite:backend-security-coder',
    'api': 'python-development:fastapi-pro',
    'database': 'observability-monitoring:database-optimizer',
    'frontend': 'frontend-mobile-development:frontend-developer',
    'deployment': 'deployment-strategies:deployment-engineer',
    'testing': 'unit-testing:test-automator',
}

def route_by_domain(task, file_path):
    """Route based on code domain."""
    
    domain = identify_domain(file_path)
    agent = DOMAIN_AGENTS.get(domain, 'general-purpose')
    
    return Task(agent, task)
```

## Agent Performance Tracking

### Metrics Collection
```python
class AgentMetrics:
    """Track agent performance metrics."""
    
    def __init__(self):
        self.metrics_db = "~/.claude/agent-metrics.db"
        
    def track_usage(self, agent, task, outcome):
        """Track agent usage and outcomes."""
        
        metrics = {
            'agent': agent,
            'task_type': classify_task(task),
            'timestamp': datetime.now().isoformat(),
            'duration': outcome.get('duration'),
            'success': outcome.get('success'),
            'user_satisfaction': outcome.get('satisfaction'),
            'tokens_used': outcome.get('tokens'),
            'cost': outcome.get('cost'),
        }
        
        self.save_metrics(metrics)
        
    def get_agent_stats(self, agent):
        """Get performance stats for agent."""
        
        stats = self.query_metrics(agent)
        
        return {
            'usage_count': len(stats),
            'success_rate': sum(s['success'] for s in stats) / len(stats),
            'avg_duration': sum(s['duration'] for s in stats) / len(stats),
            'avg_satisfaction': sum(s['satisfaction'] for s in stats) / len(stats),
            'total_cost': sum(s['cost'] for s in stats),
            'common_tasks': self.get_common_tasks(stats),
        }
```

### Agent Recommendation Engine
```python
class AgentRecommender:
    """Recommend optimal agents based on history."""
    
    def __init__(self):
        self.metrics = AgentMetrics()
        self.ml_model = self.load_recommendation_model()
        
    def recommend_agents(self, task_description, top_k=3):
        """Recommend top K agents for task."""
        
        # Feature extraction
        features = self.extract_features(task_description)
        
        # Get predictions from ML model
        predictions = self.ml_model.predict(features)
        
        # Combine with rule-based recommendations
        rule_based = self.get_rule_based_recommendations(task_description)
        
        # Weighted combination
        final_scores = self.combine_recommendations(
            predictions, rule_based, weights=[0.7, 0.3]
        )
        
        # Return top K
        return sorted(final_scores.items(), key=lambda x: x[1], reverse=True)[:top_k]
```

## Quick Decision Trees

### Debugging Decision Tree
```
Error/Bug Detected?
├── Production incident? → distributed-debugging:devops-troubleshooter
├── Test failure? → unit-testing:debugger
├── Performance issue? → observability-monitoring:performance-engineer
├── Security concern? → full-stack-orchestration:security-auditor
└── General debugging → debugging-toolkit:debugger
```

### Development Decision Tree
```
New Development Task?
├── API Development?
│   ├── Python → python-development:fastapi-pro
│   ├── Node.js → javascript-typescript:javascript-pro
│   └── GraphQL → backend-development:graphql-architect
├── Frontend Development?
│   ├── React/Next.js → frontend-mobile-development:frontend-developer
│   └── Mobile → frontend-mobile-development:mobile-developer
├── Database Work?
│   ├── Schema Design → database-design:schema-design
│   └── Optimization → observability-monitoring:database-optimizer
└── Infrastructure?
    ├── Kubernetes → cicd-automation:kubernetes-architect
    ├── Terraform → deployment-strategies:terraform-specialist
    └── CI/CD → deployment-strategies:deployment-engineer
```

## Agent Chaining Examples

### Example 1: Full Feature Development
```python
# 1. Architecture design
architect = Task("feature-dev:code-architect", "Design user authentication system")

# 2. Implementation
backend = Task("python-development:fastapi-pro", "Implement auth API based on design")
frontend = Task("frontend-mobile-development:frontend-developer", "Build auth UI components")

# 3. Testing
tests = Task("unit-testing:test-automator", "Create comprehensive test suite")

# 4. Security review
security = Task("full-stack-orchestration:security-auditor", "Audit auth implementation")

# 5. Documentation
docs = Task("documentation-generation:api-documenter", "Document auth API endpoints")
```

### Example 2: Production Issue Resolution
```python
# 1. Initial investigation
investigate = Task("distributed-debugging:devops-troubleshooter", "Investigate service outage")

# 2. Root cause analysis
if "database" in investigate.findings:
    analyze = Task("observability-monitoring:database-optimizer", "Analyze DB performance")
elif "memory" in investigate.findings:
    analyze = Task("observability-monitoring:performance-engineer", "Profile memory usage")

# 3. Fix implementation
fix = Task("error-debugging:debugger", f"Fix root cause: {analyze.root_cause}")

# 4. Preventive measures
prevent = Task("incident-response:incident-responder", "Create runbook for future incidents")
```

## Best Practices

### 1. Agent Selection
- Start with specialized agents over general-purpose
- Use domain experts for domain-specific tasks
- Chain agents for complex workflows
- Parallelize independent agent tasks

### 2. Cost Optimization
- Use lighter agents for simple tasks
- Cache agent responses when possible
- Batch similar requests to same agent
- Monitor agent token usage

### 3. Quality Assurance
- Always use review agents for critical changes
- Chain testing agents after implementation
- Use security agents for sensitive features
- Document agent decisions and rationale

### 4. Performance
- Track agent response times
- Identify slow agents for optimization
- Use async calls for parallel execution
- Set appropriate timeouts per agent type
```

workflow-orchestrator | SkillHub