Back to skills
SkillHub ClubShip Full StackFull Stack

migration-architect

Migration Architect

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
5,825
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
C62.3

Install command

npx @skill-hub/cli install alirezarezvani-claude-skills-migration-architect

Repository

alirezarezvani/claude-skills

Skill path: engineering/migration-architect

Migration Architect

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: alirezarezvani.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install migration-architect into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/alirezarezvani/claude-skills before adding migration-architect to shared team environments
  • Use migration-architect for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: "migration-architect"
description: "Migration Architect"
---

# Migration Architect

**Tier:** POWERFUL  
**Category:** Engineering - Migration Strategy  
**Purpose:** Zero-downtime migration planning, compatibility validation, and rollback strategy generation

## Overview

The Migration Architect skill provides comprehensive tools and methodologies for planning, executing, and validating complex system migrations with minimal business impact. This skill combines proven migration patterns with automated planning tools to ensure successful transitions between systems, databases, and infrastructure.

## Core Capabilities

### 1. Migration Strategy Planning
- **Phased Migration Planning:** Break complex migrations into manageable phases with clear validation gates
- **Risk Assessment:** Identify potential failure points and mitigation strategies before execution
- **Timeline Estimation:** Generate realistic timelines based on migration complexity and resource constraints
- **Stakeholder Communication:** Create communication templates and progress dashboards

### 2. Compatibility Analysis
- **Schema Evolution:** Analyze database schema changes for backward compatibility issues
- **API Versioning:** Detect breaking changes in REST/GraphQL APIs and microservice interfaces
- **Data Type Validation:** Identify data format mismatches and conversion requirements
- **Constraint Analysis:** Validate referential integrity and business rule changes

### 3. Rollback Strategy Generation
- **Automated Rollback Plans:** Generate comprehensive rollback procedures for each migration phase
- **Data Recovery Scripts:** Create point-in-time data restoration procedures
- **Service Rollback:** Plan service version rollbacks with traffic management
- **Validation Checkpoints:** Define success criteria and rollback triggers

## Migration Patterns

### Database Migrations

#### Schema Evolution Patterns
1. **Expand-Contract Pattern**
   - **Expand:** Add new columns/tables alongside existing schema
   - **Dual Write:** Application writes to both old and new schema
   - **Migration:** Backfill historical data to new schema
   - **Contract:** Remove old columns/tables after validation

2. **Parallel Schema Pattern**
   - Run new schema in parallel with existing schema
   - Use feature flags to route traffic between schemas
   - Validate data consistency between parallel systems
   - Cutover when confidence is high

3. **Event Sourcing Migration**
   - Capture all changes as events during migration window
   - Apply events to new schema for consistency
   - Enable replay capability for rollback scenarios

#### Data Migration Strategies
1. **Bulk Data Migration**
   - **Snapshot Approach:** Full data copy during maintenance window
   - **Incremental Sync:** Continuous data synchronization with change tracking
   - **Stream Processing:** Real-time data transformation pipelines

2. **Dual-Write Pattern**
   - Write to both source and target systems during migration
   - Implement compensation patterns for write failures
   - Use distributed transactions where consistency is critical

3. **Change Data Capture (CDC)**
   - Stream database changes to target system
   - Maintain eventual consistency during migration
   - Enable zero-downtime migrations for large datasets

### Service Migrations

#### Strangler Fig Pattern
1. **Intercept Requests:** Route traffic through proxy/gateway
2. **Gradually Replace:** Implement new service functionality incrementally
3. **Legacy Retirement:** Remove old service components as new ones prove stable
4. **Monitoring:** Track performance and error rates throughout transition

```mermaid
graph TD
    A[Client Requests] --> B[API Gateway]
    B --> C{Route Decision}
    C -->|Legacy Path| D[Legacy Service]
    C -->|New Path| E[New Service]
    D --> F[Legacy Database]
    E --> G[New Database]
```

#### Parallel Run Pattern
1. **Dual Execution:** Run both old and new services simultaneously
2. **Shadow Traffic:** Route production traffic to both systems
3. **Result Comparison:** Compare outputs to validate correctness
4. **Gradual Cutover:** Shift traffic percentage based on confidence

#### Canary Deployment Pattern
1. **Limited Rollout:** Deploy new service to small percentage of users
2. **Monitoring:** Track key metrics (latency, errors, business KPIs)
3. **Gradual Increase:** Increase traffic percentage as confidence grows
4. **Full Rollout:** Complete migration once validation passes

### Infrastructure Migrations

#### Cloud-to-Cloud Migration
1. **Assessment Phase**
   - Inventory existing resources and dependencies
   - Map services to target cloud equivalents
   - Identify vendor-specific features requiring refactoring

2. **Pilot Migration**
   - Migrate non-critical workloads first
   - Validate performance and cost models
   - Refine migration procedures

3. **Production Migration**
   - Use infrastructure as code for consistency
   - Implement cross-cloud networking during transition
   - Maintain disaster recovery capabilities

#### On-Premises to Cloud Migration
1. **Lift and Shift**
   - Minimal changes to existing applications
   - Quick migration with optimization later
   - Use cloud migration tools and services

2. **Re-architecture**
   - Redesign applications for cloud-native patterns
   - Adopt microservices, containers, and serverless
   - Implement cloud security and scaling practices

3. **Hybrid Approach**
   - Keep sensitive data on-premises
   - Migrate compute workloads to cloud
   - Implement secure connectivity between environments

## Feature Flags for Migrations

### Progressive Feature Rollout
```python
# Example feature flag implementation
class MigrationFeatureFlag:
    def __init__(self, flag_name, rollout_percentage=0):
        self.flag_name = flag_name
        self.rollout_percentage = rollout_percentage
    
    def is_enabled_for_user(self, user_id):
        hash_value = hash(f"{self.flag_name}:{user_id}")
        return (hash_value % 100) < self.rollout_percentage
    
    def gradual_rollout(self, target_percentage, step_size=10):
        while self.rollout_percentage < target_percentage:
            self.rollout_percentage = min(
                self.rollout_percentage + step_size,
                target_percentage
            )
            yield self.rollout_percentage
```

### Circuit Breaker Pattern
Implement automatic fallback to legacy systems when new systems show degraded performance:

```python
class MigrationCircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    def call_new_service(self, request):
        if self.state == 'OPEN':
            if self.should_attempt_reset():
                self.state = 'HALF_OPEN'
            else:
                return self.fallback_to_legacy(request)
        
        try:
            response = self.new_service.process(request)
            self.on_success()
            return response
        except Exception as e:
            self.on_failure()
            return self.fallback_to_legacy(request)
```

## Data Validation and Reconciliation

### Validation Strategies
1. **Row Count Validation**
   - Compare record counts between source and target
   - Account for soft deletes and filtered records
   - Implement threshold-based alerting

2. **Checksums and Hashing**
   - Generate checksums for critical data subsets
   - Compare hash values to detect data drift
   - Use sampling for large datasets

3. **Business Logic Validation**
   - Run critical business queries on both systems
   - Compare aggregate results (sums, counts, averages)
   - Validate derived data and calculations

### Reconciliation Patterns
1. **Delta Detection**
   ```sql
   -- Example delta query for reconciliation
   SELECT 'missing_in_target' as issue_type, source_id
   FROM source_table s
   WHERE NOT EXISTS (
       SELECT 1 FROM target_table t 
       WHERE t.id = s.id
   )
   UNION ALL
   SELECT 'extra_in_target' as issue_type, target_id
   FROM target_table t
   WHERE NOT EXISTS (
       SELECT 1 FROM source_table s 
       WHERE s.id = t.id
   );
   ```

2. **Automated Correction**
   - Implement data repair scripts for common issues
   - Use idempotent operations for safe re-execution
   - Log all correction actions for audit trails

## Rollback Strategies

### Database Rollback
1. **Schema Rollback**
   - Maintain schema version control
   - Use backward-compatible migrations when possible
   - Keep rollback scripts for each migration step

2. **Data Rollback**
   - Point-in-time recovery using database backups
   - Transaction log replay for precise rollback points
   - Maintain data snapshots at migration checkpoints

### Service Rollback
1. **Blue-Green Deployment**
   - Keep previous service version running during migration
   - Switch traffic back to blue environment if issues arise
   - Maintain parallel infrastructure during migration window

2. **Rolling Rollback**
   - Gradually shift traffic back to previous version
   - Monitor system health during rollback process
   - Implement automated rollback triggers

### Infrastructure Rollback
1. **Infrastructure as Code**
   - Version control all infrastructure definitions
   - Maintain rollback terraform/CloudFormation templates
   - Test rollback procedures in staging environments

2. **Data Persistence**
   - Preserve data in original location during migration
   - Implement data sync back to original systems
   - Maintain backup strategies across both environments

## Risk Assessment Framework

### Risk Categories
1. **Technical Risks**
   - Data loss or corruption
   - Service downtime or degraded performance
   - Integration failures with dependent systems
   - Scalability issues under production load

2. **Business Risks**
   - Revenue impact from service disruption
   - Customer experience degradation
   - Compliance and regulatory concerns
   - Brand reputation impact

3. **Operational Risks**
   - Team knowledge gaps
   - Insufficient testing coverage
   - Inadequate monitoring and alerting
   - Communication breakdowns

### Risk Mitigation Strategies
1. **Technical Mitigations**
   - Comprehensive testing (unit, integration, load, chaos)
   - Gradual rollout with automated rollback triggers
   - Data validation and reconciliation processes
   - Performance monitoring and alerting

2. **Business Mitigations**
   - Stakeholder communication plans
   - Business continuity procedures
   - Customer notification strategies
   - Revenue protection measures

3. **Operational Mitigations**
   - Team training and documentation
   - Runbook creation and testing
   - On-call rotation planning
   - Post-migration review processes

## Migration Runbooks

### Pre-Migration Checklist
- [ ] Migration plan reviewed and approved
- [ ] Rollback procedures tested and validated
- [ ] Monitoring and alerting configured
- [ ] Team roles and responsibilities defined
- [ ] Stakeholder communication plan activated
- [ ] Backup and recovery procedures verified
- [ ] Test environment validation complete
- [ ] Performance benchmarks established
- [ ] Security review completed
- [ ] Compliance requirements verified

### During Migration
- [ ] Execute migration phases in planned order
- [ ] Monitor key performance indicators continuously
- [ ] Validate data consistency at each checkpoint
- [ ] Communicate progress to stakeholders
- [ ] Document any deviations from plan
- [ ] Execute rollback if success criteria not met
- [ ] Coordinate with dependent teams
- [ ] Maintain detailed execution logs

### Post-Migration
- [ ] Validate all success criteria met
- [ ] Perform comprehensive system health checks
- [ ] Execute data reconciliation procedures
- [ ] Monitor system performance over 72 hours
- [ ] Update documentation and runbooks
- [ ] Decommission legacy systems (if applicable)
- [ ] Conduct post-migration retrospective
- [ ] Archive migration artifacts
- [ ] Update disaster recovery procedures

## Communication Templates

### Executive Summary Template
```
Migration Status: [IN_PROGRESS | COMPLETED | ROLLED_BACK]
Start Time: [YYYY-MM-DD HH:MM UTC]
Current Phase: [X of Y]
Overall Progress: [X%]

Key Metrics:
- System Availability: [X.XX%]
- Data Migration Progress: [X.XX%]
- Performance Impact: [+/-X%]
- Issues Encountered: [X]

Next Steps:
1. [Action item 1]
2. [Action item 2]

Risk Assessment: [LOW | MEDIUM | HIGH]
Rollback Status: [AVAILABLE | NOT_AVAILABLE]
```

### Technical Team Update Template
```
Phase: [Phase Name] - [Status]
Duration: [Started] - [Expected End]

Completed Tasks:
✓ [Task 1]
✓ [Task 2]

In Progress:
🔄 [Task 3] - [X% complete]

Upcoming:
⏳ [Task 4] - [Expected start time]

Issues:
⚠️ [Issue description] - [Severity] - [ETA resolution]

Metrics:
- Migration Rate: [X records/minute]
- Error Rate: [X.XX%]
- System Load: [CPU/Memory/Disk]
```

## Success Metrics

### Technical Metrics
- **Migration Completion Rate:** Percentage of data/services successfully migrated
- **Downtime Duration:** Total system unavailability during migration
- **Data Consistency Score:** Percentage of data validation checks passing
- **Performance Delta:** Performance change compared to baseline
- **Error Rate:** Percentage of failed operations during migration

### Business Metrics
- **Customer Impact Score:** Measure of customer experience degradation
- **Revenue Protection:** Percentage of revenue maintained during migration
- **Time to Value:** Duration from migration start to business value realization
- **Stakeholder Satisfaction:** Post-migration stakeholder feedback scores

### Operational Metrics
- **Plan Adherence:** Percentage of migration executed according to plan
- **Issue Resolution Time:** Average time to resolve migration issues
- **Team Efficiency:** Resource utilization and productivity metrics
- **Knowledge Transfer Score:** Team readiness for post-migration operations

## Tools and Technologies

### Migration Planning Tools
- **migration_planner.py:** Automated migration plan generation
- **compatibility_checker.py:** Schema and API compatibility analysis
- **rollback_generator.py:** Comprehensive rollback procedure generation

### Validation Tools
- Database comparison utilities (schema and data)
- API contract testing frameworks
- Performance benchmarking tools
- Data quality validation pipelines

### Monitoring and Alerting
- Real-time migration progress dashboards
- Automated rollback trigger systems
- Business metric monitoring
- Stakeholder notification systems

## Best Practices

### Planning Phase
1. **Start with Risk Assessment:** Identify all potential failure modes before planning
2. **Design for Rollback:** Every migration step should have a tested rollback procedure
3. **Validate in Staging:** Execute full migration process in production-like environment
4. **Plan for Gradual Rollout:** Use feature flags and traffic routing for controlled migration

### Execution Phase
1. **Monitor Continuously:** Track both technical and business metrics throughout
2. **Communicate Proactively:** Keep all stakeholders informed of progress and issues
3. **Document Everything:** Maintain detailed logs for post-migration analysis
4. **Stay Flexible:** Be prepared to adjust timeline based on real-world performance

### Validation Phase
1. **Automate Validation:** Use automated tools for data consistency and performance checks
2. **Business Logic Testing:** Validate critical business processes end-to-end
3. **Load Testing:** Verify system performance under expected production load
4. **Security Validation:** Ensure security controls function properly in new environment

## Integration with Development Lifecycle

### CI/CD Integration
```yaml
# Example migration pipeline stage
migration_validation:
  stage: test
  script:
    - python scripts/compatibility_checker.py --before=old_schema.json --after=new_schema.json
    - python scripts/migration_planner.py --config=migration_config.json --validate
  artifacts:
    reports:
      - compatibility_report.json
      - migration_plan.json
```

### Infrastructure as Code
```terraform
# Example Terraform for blue-green infrastructure
resource "aws_instance" "blue_environment" {
  count = var.migration_phase == "preparation" ? var.instance_count : 0
  # Blue environment configuration
}

resource "aws_instance" "green_environment" {
  count = var.migration_phase == "execution" ? var.instance_count : 0
  # Green environment configuration
}
```

This Migration Architect skill provides a comprehensive framework for planning, executing, and validating complex system migrations while minimizing business impact and technical risk. The combination of automated tools, proven patterns, and detailed procedures enables organizations to confidently undertake even the most complex migration projects.

---

## Referenced Files

> The following files are referenced in this skill and included for context.

### scripts/compatibility_checker.py

```python
#!/usr/bin/env python3
"""
Compatibility Checker - Analyze schema and API compatibility between versions

This tool analyzes schema and API changes between versions and identifies backward
compatibility issues including breaking changes, data type mismatches, missing fields,
constraint violations, and generates migration scripts suggestions.

Author: Migration Architect Skill
Version: 1.0.0
License: MIT
"""

import json
import argparse
import sys
import re
import datetime
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, asdict
from enum import Enum


class ChangeType(Enum):
    """Types of changes detected"""
    BREAKING = "breaking"
    POTENTIALLY_BREAKING = "potentially_breaking"
    NON_BREAKING = "non_breaking"
    ADDITIVE = "additive"


class CompatibilityLevel(Enum):
    """Compatibility assessment levels"""
    FULLY_COMPATIBLE = "fully_compatible"
    BACKWARD_COMPATIBLE = "backward_compatible"
    POTENTIALLY_INCOMPATIBLE = "potentially_incompatible"
    BREAKING_CHANGES = "breaking_changes"


@dataclass
class CompatibilityIssue:
    """Individual compatibility issue"""
    type: str
    severity: str
    description: str
    field_path: str
    old_value: Any
    new_value: Any
    impact: str
    suggested_migration: str
    affected_operations: List[str]


@dataclass
class MigrationScript:
    """Migration script suggestion"""
    script_type: str  # sql, api, config
    description: str
    script_content: str
    rollback_script: str
    dependencies: List[str]
    validation_query: str


@dataclass
class CompatibilityReport:
    """Complete compatibility analysis report"""
    schema_before: str
    schema_after: str
    analysis_date: str
    overall_compatibility: str
    breaking_changes_count: int
    potentially_breaking_count: int
    non_breaking_changes_count: int
    additive_changes_count: int
    issues: List[CompatibilityIssue]
    migration_scripts: List[MigrationScript]
    risk_assessment: Dict[str, Any]
    recommendations: List[str]


class SchemaCompatibilityChecker:
    """Main schema compatibility checker class"""
    
    def __init__(self):
        self.type_compatibility_matrix = self._build_type_compatibility_matrix()
        self.constraint_implications = self._build_constraint_implications()
    
    def _build_type_compatibility_matrix(self) -> Dict[str, Dict[str, str]]:
        """Build data type compatibility matrix"""
        return {
            # SQL data types compatibility
            "varchar": {
                "text": "compatible",
                "char": "potentially_breaking",  # length might be different
                "nvarchar": "compatible",
                "int": "breaking",
                "bigint": "breaking",
                "decimal": "breaking",
                "datetime": "breaking",
                "boolean": "breaking"
            },
            "int": {
                "bigint": "compatible",
                "smallint": "potentially_breaking",  # range reduction
                "decimal": "compatible",
                "float": "potentially_breaking",  # precision loss
                "varchar": "breaking",
                "boolean": "breaking"
            },
            "bigint": {
                "int": "potentially_breaking",  # range reduction
                "decimal": "compatible",
                "varchar": "breaking",
                "boolean": "breaking"
            },
            "decimal": {
                "float": "potentially_breaking",  # precision loss
                "int": "potentially_breaking",  # precision loss
                "bigint": "potentially_breaking",  # precision loss
                "varchar": "breaking",
                "boolean": "breaking"
            },
            "datetime": {
                "timestamp": "compatible",
                "date": "potentially_breaking",  # time component lost
                "varchar": "breaking",
                "int": "breaking"
            },
            "boolean": {
                "tinyint": "compatible",
                "varchar": "breaking",
                "int": "breaking"
            },
            # JSON/API field types
            "string": {
                "number": "breaking",
                "boolean": "breaking",
                "array": "breaking",
                "object": "breaking",
                "null": "potentially_breaking"
            },
            "number": {
                "string": "breaking",
                "boolean": "breaking",
                "array": "breaking",
                "object": "breaking",
                "null": "potentially_breaking"
            },
            "boolean": {
                "string": "breaking",
                "number": "breaking",
                "array": "breaking",
                "object": "breaking",
                "null": "potentially_breaking"
            },
            "array": {
                "string": "breaking",
                "number": "breaking",
                "boolean": "breaking",
                "object": "breaking",
                "null": "potentially_breaking"
            },
            "object": {
                "string": "breaking",
                "number": "breaking",
                "boolean": "breaking",
                "array": "breaking",
                "null": "potentially_breaking"
            }
        }
    
    def _build_constraint_implications(self) -> Dict[str, Dict[str, str]]:
        """Build constraint change implications"""
        return {
            "required": {
                "added": "breaking",  # Previously optional field now required
                "removed": "non_breaking"  # Previously required field now optional
            },
            "not_null": {
                "added": "breaking",  # Previously nullable now NOT NULL
                "removed": "non_breaking"  # Previously NOT NULL now nullable
            },
            "unique": {
                "added": "potentially_breaking",  # May fail if duplicates exist
                "removed": "non_breaking"  # No longer enforcing uniqueness
            },
            "primary_key": {
                "added": "breaking",  # Major structural change
                "removed": "breaking",  # Major structural change
                "modified": "breaking"  # Primary key change is always breaking
            },
            "foreign_key": {
                "added": "potentially_breaking",  # May fail if referential integrity violated
                "removed": "potentially_breaking",  # May allow orphaned records
                "modified": "breaking"  # Reference change is breaking
            },
            "check": {
                "added": "potentially_breaking",  # May fail if existing data violates check
                "removed": "non_breaking",  # No longer enforcing check
                "modified": "potentially_breaking"  # Different validation rules
            },
            "index": {
                "added": "non_breaking",  # Performance improvement
                "removed": "non_breaking",  # Performance impact only
                "modified": "non_breaking"  # Performance impact only
            }
        }
    
    def analyze_database_schema(self, before_schema: Dict[str, Any], 
                              after_schema: Dict[str, Any]) -> CompatibilityReport:
        """Analyze database schema compatibility"""
        issues = []
        migration_scripts = []
        
        before_tables = before_schema.get("tables", {})
        after_tables = after_schema.get("tables", {})
        
        # Check for removed tables
        for table_name in before_tables:
            if table_name not in after_tables:
                issues.append(CompatibilityIssue(
                    type="table_removed",
                    severity="breaking",
                    description=f"Table '{table_name}' has been removed",
                    field_path=f"tables.{table_name}",
                    old_value=before_tables[table_name],
                    new_value=None,
                    impact="All operations on this table will fail",
                    suggested_migration=f"CREATE VIEW {table_name} AS SELECT * FROM replacement_table;",
                    affected_operations=["SELECT", "INSERT", "UPDATE", "DELETE"]
                ))
        
        # Check for added tables
        for table_name in after_tables:
            if table_name not in before_tables:
                migration_scripts.append(MigrationScript(
                    script_type="sql",
                    description=f"Create new table {table_name}",
                    script_content=self._generate_create_table_sql(table_name, after_tables[table_name]),
                    rollback_script=f"DROP TABLE IF EXISTS {table_name};",
                    dependencies=[],
                    validation_query=f"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}';"
                ))
        
        # Check for modified tables
        for table_name in set(before_tables.keys()) & set(after_tables.keys()):
            table_issues, table_scripts = self._analyze_table_changes(
                table_name, before_tables[table_name], after_tables[table_name]
            )
            issues.extend(table_issues)
            migration_scripts.extend(table_scripts)
        
        return self._build_compatibility_report(
            before_schema, after_schema, issues, migration_scripts
        )
    
    def analyze_api_schema(self, before_schema: Dict[str, Any], 
                          after_schema: Dict[str, Any]) -> CompatibilityReport:
        """Analyze REST API schema compatibility"""
        issues = []
        migration_scripts = []
        
        # Analyze endpoints
        before_paths = before_schema.get("paths", {})
        after_paths = after_schema.get("paths", {})
        
        # Check for removed endpoints
        for path in before_paths:
            if path not in after_paths:
                for method in before_paths[path]:
                    issues.append(CompatibilityIssue(
                        type="endpoint_removed",
                        severity="breaking",
                        description=f"Endpoint {method.upper()} {path} has been removed",
                        field_path=f"paths.{path}.{method}",
                        old_value=before_paths[path][method],
                        new_value=None,
                        impact="Client requests to this endpoint will fail with 404",
                        suggested_migration=f"Implement redirect to replacement endpoint or maintain backward compatibility stub",
                        affected_operations=[f"{method.upper()} {path}"]
                    ))
        
        # Check for modified endpoints
        for path in set(before_paths.keys()) & set(after_paths.keys()):
            path_issues, path_scripts = self._analyze_endpoint_changes(
                path, before_paths[path], after_paths[path]
            )
            issues.extend(path_issues)
            migration_scripts.extend(path_scripts)
        
        # Analyze data models
        before_components = before_schema.get("components", {}).get("schemas", {})
        after_components = after_schema.get("components", {}).get("schemas", {})
        
        for model_name in set(before_components.keys()) & set(after_components.keys()):
            model_issues, model_scripts = self._analyze_model_changes(
                model_name, before_components[model_name], after_components[model_name]
            )
            issues.extend(model_issues)
            migration_scripts.extend(model_scripts)
        
        return self._build_compatibility_report(
            before_schema, after_schema, issues, migration_scripts
        )
    
    def _analyze_table_changes(self, table_name: str, before_table: Dict[str, Any], 
                             after_table: Dict[str, Any]) -> Tuple[List[CompatibilityIssue], List[MigrationScript]]:
        """Analyze changes to a specific table"""
        issues = []
        scripts = []
        
        before_columns = before_table.get("columns", {})
        after_columns = after_table.get("columns", {})
        
        # Check for removed columns
        for col_name in before_columns:
            if col_name not in after_columns:
                issues.append(CompatibilityIssue(
                    type="column_removed",
                    severity="breaking",
                    description=f"Column '{col_name}' removed from table '{table_name}'",
                    field_path=f"tables.{table_name}.columns.{col_name}",
                    old_value=before_columns[col_name],
                    new_value=None,
                    impact="SELECT statements including this column will fail",
                    suggested_migration=f"ALTER TABLE {table_name} ADD COLUMN {col_name}_deprecated AS computed_value;",
                    affected_operations=["SELECT", "INSERT", "UPDATE"]
                ))
        
        # Check for added columns
        for col_name in after_columns:
            if col_name not in before_columns:
                col_def = after_columns[col_name]
                is_required = col_def.get("nullable", True) == False and col_def.get("default") is None
                
                if is_required:
                    issues.append(CompatibilityIssue(
                        type="required_column_added",
                        severity="breaking",
                        description=f"Required column '{col_name}' added to table '{table_name}'",
                        field_path=f"tables.{table_name}.columns.{col_name}",
                        old_value=None,
                        new_value=col_def,
                        impact="INSERT statements without this column will fail",
                        suggested_migration=f"Add default value or make column nullable initially",
                        affected_operations=["INSERT"]
                    ))
                
                scripts.append(MigrationScript(
                    script_type="sql",
                    description=f"Add column {col_name} to table {table_name}",
                    script_content=f"ALTER TABLE {table_name} ADD COLUMN {self._generate_column_definition(col_name, col_def)};",
                    rollback_script=f"ALTER TABLE {table_name} DROP COLUMN {col_name};",
                    dependencies=[],
                    validation_query=f"SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = '{col_name}';"
                ))
        
        # Check for modified columns
        for col_name in set(before_columns.keys()) & set(after_columns.keys()):
            col_issues, col_scripts = self._analyze_column_changes(
                table_name, col_name, before_columns[col_name], after_columns[col_name]
            )
            issues.extend(col_issues)
            scripts.extend(col_scripts)
        
        # Check constraint changes
        before_constraints = before_table.get("constraints", {})
        after_constraints = after_table.get("constraints", {})
        
        constraint_issues, constraint_scripts = self._analyze_constraint_changes(
            table_name, before_constraints, after_constraints
        )
        issues.extend(constraint_issues)
        scripts.extend(constraint_scripts)
        
        return issues, scripts
    
    def _analyze_column_changes(self, table_name: str, col_name: str, 
                              before_col: Dict[str, Any], after_col: Dict[str, Any]) -> Tuple[List[CompatibilityIssue], List[MigrationScript]]:
        """Analyze changes to a specific column"""
        issues = []
        scripts = []
        
        # Check data type changes
        before_type = before_col.get("type", "").lower()
        after_type = after_col.get("type", "").lower()
        
        if before_type != after_type:
            compatibility = self.type_compatibility_matrix.get(before_type, {}).get(after_type, "breaking")
            
            if compatibility == "breaking":
                issues.append(CompatibilityIssue(
                    type="incompatible_type_change",
                    severity="breaking",
                    description=f"Column '{col_name}' type changed from {before_type} to {after_type}",
                    field_path=f"tables.{table_name}.columns.{col_name}.type",
                    old_value=before_type,
                    new_value=after_type,
                    impact="Data conversion may fail or lose precision",
                    suggested_migration=f"Add conversion logic and validate data integrity",
                    affected_operations=["SELECT", "INSERT", "UPDATE", "WHERE clauses"]
                ))
                
                scripts.append(MigrationScript(
                    script_type="sql",
                    description=f"Convert column {col_name} from {before_type} to {after_type}",
                    script_content=f"ALTER TABLE {table_name} ALTER COLUMN {col_name} TYPE {after_type} USING {col_name}::{after_type};",
                    rollback_script=f"ALTER TABLE {table_name} ALTER COLUMN {col_name} TYPE {before_type};",
                    dependencies=[f"backup_{table_name}"],
                    validation_query=f"SELECT COUNT(*) FROM {table_name} WHERE {col_name} IS NOT NULL;"
                ))
            
            elif compatibility == "potentially_breaking":
                issues.append(CompatibilityIssue(
                    type="risky_type_change",
                    severity="potentially_breaking",
                    description=f"Column '{col_name}' type changed from {before_type} to {after_type} - may lose data",
                    field_path=f"tables.{table_name}.columns.{col_name}.type",
                    old_value=before_type,
                    new_value=after_type,
                    impact="Potential data loss or precision reduction",
                    suggested_migration=f"Validate all existing data can be converted safely",
                    affected_operations=["Data integrity"]
                ))
        
        # Check nullability changes
        before_nullable = before_col.get("nullable", True)
        after_nullable = after_col.get("nullable", True)
        
        if before_nullable != after_nullable:
            if before_nullable and not after_nullable:  # null -> not null
                issues.append(CompatibilityIssue(
                    type="nullability_restriction",
                    severity="breaking",
                    description=f"Column '{col_name}' changed from nullable to NOT NULL",
                    field_path=f"tables.{table_name}.columns.{col_name}.nullable",
                    old_value=before_nullable,
                    new_value=after_nullable,
                    impact="Existing NULL values will cause constraint violations",
                    suggested_migration=f"Update NULL values to valid defaults before applying NOT NULL constraint",
                    affected_operations=["INSERT", "UPDATE"]
                ))
                
                scripts.append(MigrationScript(
                    script_type="sql",
                    description=f"Make column {col_name} NOT NULL",
                    script_content=f"""
                    -- Update NULL values first
                    UPDATE {table_name} SET {col_name} = 'DEFAULT_VALUE' WHERE {col_name} IS NULL;
                    -- Add NOT NULL constraint
                    ALTER TABLE {table_name} ALTER COLUMN {col_name} SET NOT NULL;
                    """,
                    rollback_script=f"ALTER TABLE {table_name} ALTER COLUMN {col_name} DROP NOT NULL;",
                    dependencies=[],
                    validation_query=f"SELECT COUNT(*) FROM {table_name} WHERE {col_name} IS NULL;"
                ))
        
        # Check length/precision changes
        before_length = before_col.get("length")
        after_length = after_col.get("length")
        
        if before_length and after_length and before_length != after_length:
            if after_length < before_length:
                issues.append(CompatibilityIssue(
                    type="length_reduction",
                    severity="potentially_breaking",
                    description=f"Column '{col_name}' length reduced from {before_length} to {after_length}",
                    field_path=f"tables.{table_name}.columns.{col_name}.length",
                    old_value=before_length,
                    new_value=after_length,
                    impact="Data truncation may occur for values exceeding new length",
                    suggested_migration=f"Validate no existing data exceeds new length limit",
                    affected_operations=["INSERT", "UPDATE"]
                ))
        
        return issues, scripts
    
    def _analyze_constraint_changes(self, table_name: str, before_constraints: Dict[str, Any], 
                                  after_constraints: Dict[str, Any]) -> Tuple[List[CompatibilityIssue], List[MigrationScript]]:
        """Analyze constraint changes"""
        issues = []
        scripts = []
        
        for constraint_type in ["primary_key", "foreign_key", "unique", "check"]:
            before_constraint = before_constraints.get(constraint_type, [])
            after_constraint = after_constraints.get(constraint_type, [])
            
            # Convert to sets for comparison
            before_set = set(str(c) for c in before_constraint) if isinstance(before_constraint, list) else {str(before_constraint)} if before_constraint else set()
            after_set = set(str(c) for c in after_constraint) if isinstance(after_constraint, list) else {str(after_constraint)} if after_constraint else set()
            
            # Check for removed constraints
            for constraint in before_set - after_set:
                implication = self.constraint_implications.get(constraint_type, {}).get("removed", "non_breaking")
                issues.append(CompatibilityIssue(
                    type=f"{constraint_type}_removed",
                    severity=implication,
                    description=f"{constraint_type.replace('_', ' ').title()} constraint '{constraint}' removed from table '{table_name}'",
                    field_path=f"tables.{table_name}.constraints.{constraint_type}",
                    old_value=constraint,
                    new_value=None,
                    impact=f"No longer enforcing {constraint_type} constraint",
                    suggested_migration=f"Consider application-level validation for removed constraint",
                    affected_operations=["INSERT", "UPDATE", "DELETE"]
                ))
            
            # Check for added constraints
            for constraint in after_set - before_set:
                implication = self.constraint_implications.get(constraint_type, {}).get("added", "potentially_breaking")
                issues.append(CompatibilityIssue(
                    type=f"{constraint_type}_added",
                    severity=implication,
                    description=f"New {constraint_type.replace('_', ' ')} constraint '{constraint}' added to table '{table_name}'",
                    field_path=f"tables.{table_name}.constraints.{constraint_type}",
                    old_value=None,
                    new_value=constraint,
                    impact=f"New {constraint_type} constraint may reject existing data",
                    suggested_migration=f"Validate existing data complies with new constraint",
                    affected_operations=["INSERT", "UPDATE"]
                ))
                
                scripts.append(MigrationScript(
                    script_type="sql",
                    description=f"Add {constraint_type} constraint to {table_name}",
                    script_content=f"ALTER TABLE {table_name} ADD CONSTRAINT {constraint_type}_{table_name} {constraint_type.upper()} ({constraint});",
                    rollback_script=f"ALTER TABLE {table_name} DROP CONSTRAINT {constraint_type}_{table_name};",
                    dependencies=[],
                    validation_query=f"SELECT COUNT(*) FROM information_schema.table_constraints WHERE table_name = '{table_name}' AND constraint_type = '{constraint_type.upper()}';"
                ))
        
        return issues, scripts
    
    def _analyze_endpoint_changes(self, path: str, before_endpoint: Dict[str, Any], 
                                after_endpoint: Dict[str, Any]) -> Tuple[List[CompatibilityIssue], List[MigrationScript]]:
        """Analyze changes to an API endpoint"""
        issues = []
        scripts = []
        
        for method in set(before_endpoint.keys()) & set(after_endpoint.keys()):
            before_method = before_endpoint[method]
            after_method = after_endpoint[method]
            
            # Check parameter changes
            before_params = before_method.get("parameters", [])
            after_params = after_method.get("parameters", [])
            
            before_param_names = {p["name"] for p in before_params}
            after_param_names = {p["name"] for p in after_params}
            
            # Check for removed required parameters
            for param_name in before_param_names - after_param_names:
                param = next(p for p in before_params if p["name"] == param_name)
                if param.get("required", False):
                    issues.append(CompatibilityIssue(
                        type="required_parameter_removed",
                        severity="breaking",
                        description=f"Required parameter '{param_name}' removed from {method.upper()} {path}",
                        field_path=f"paths.{path}.{method}.parameters",
                        old_value=param,
                        new_value=None,
                        impact="Client requests with this parameter will fail",
                        suggested_migration="Implement parameter validation with backward compatibility",
                        affected_operations=[f"{method.upper()} {path}"]
                    ))
            
            # Check for added required parameters
            for param_name in after_param_names - before_param_names:
                param = next(p for p in after_params if p["name"] == param_name)
                if param.get("required", False):
                    issues.append(CompatibilityIssue(
                        type="required_parameter_added",
                        severity="breaking",
                        description=f"New required parameter '{param_name}' added to {method.upper()} {path}",
                        field_path=f"paths.{path}.{method}.parameters",
                        old_value=None,
                        new_value=param,
                        impact="Client requests without this parameter will fail",
                        suggested_migration="Provide default value or make parameter optional initially",
                        affected_operations=[f"{method.upper()} {path}"]
                    ))
            
            # Check response schema changes
            before_responses = before_method.get("responses", {})
            after_responses = after_method.get("responses", {})
            
            for status_code in before_responses:
                if status_code in after_responses:
                    before_schema = before_responses[status_code].get("content", {}).get("application/json", {}).get("schema", {})
                    after_schema = after_responses[status_code].get("content", {}).get("application/json", {}).get("schema", {})
                    
                    if before_schema != after_schema:
                        issues.append(CompatibilityIssue(
                            type="response_schema_changed",
                            severity="potentially_breaking",
                            description=f"Response schema changed for {method.upper()} {path} (status {status_code})",
                            field_path=f"paths.{path}.{method}.responses.{status_code}",
                            old_value=before_schema,
                            new_value=after_schema,
                            impact="Client response parsing may fail",
                            suggested_migration="Implement versioned API responses",
                            affected_operations=[f"{method.upper()} {path}"]
                        ))
        
        return issues, scripts
    
    def _analyze_model_changes(self, model_name: str, before_model: Dict[str, Any], 
                             after_model: Dict[str, Any]) -> Tuple[List[CompatibilityIssue], List[MigrationScript]]:
        """Analyze changes to an API data model"""
        issues = []
        scripts = []
        
        before_props = before_model.get("properties", {})
        after_props = after_model.get("properties", {})
        before_required = set(before_model.get("required", []))
        after_required = set(after_model.get("required", []))
        
        # Check for removed properties
        for prop_name in set(before_props.keys()) - set(after_props.keys()):
            issues.append(CompatibilityIssue(
                type="property_removed",
                severity="breaking",
                description=f"Property '{prop_name}' removed from model '{model_name}'",
                field_path=f"components.schemas.{model_name}.properties.{prop_name}",
                old_value=before_props[prop_name],
                new_value=None,
                impact="Client code expecting this property will fail",
                suggested_migration="Use API versioning to maintain backward compatibility",
                affected_operations=["Serialization", "Deserialization"]
            ))
        
        # Check for newly required properties
        for prop_name in after_required - before_required:
            issues.append(CompatibilityIssue(
                type="property_made_required",
                severity="breaking",
                description=f"Property '{prop_name}' is now required in model '{model_name}'",
                field_path=f"components.schemas.{model_name}.required",
                old_value=list(before_required),
                new_value=list(after_required),
                impact="Client requests without this property will fail validation",
                suggested_migration="Provide default values or implement gradual rollout",
                affected_operations=["Request validation"]
            ))
        
        # Check for property type changes
        for prop_name in set(before_props.keys()) & set(after_props.keys()):
            before_type = before_props[prop_name].get("type")
            after_type = after_props[prop_name].get("type")
            
            if before_type != after_type:
                compatibility = self.type_compatibility_matrix.get(before_type, {}).get(after_type, "breaking")
                issues.append(CompatibilityIssue(
                    type="property_type_changed",
                    severity=compatibility,
                    description=f"Property '{prop_name}' type changed from {before_type} to {after_type} in model '{model_name}'",
                    field_path=f"components.schemas.{model_name}.properties.{prop_name}.type",
                    old_value=before_type,
                    new_value=after_type,
                    impact="Client serialization/deserialization may fail",
                    suggested_migration="Implement type coercion or API versioning",
                    affected_operations=["Serialization", "Deserialization"]
                ))
        
        return issues, scripts
    
    def _build_compatibility_report(self, before_schema: Dict[str, Any], after_schema: Dict[str, Any],
                                  issues: List[CompatibilityIssue], migration_scripts: List[MigrationScript]) -> CompatibilityReport:
        """Build the final compatibility report"""
        # Count issues by severity
        breaking_count = sum(1 for issue in issues if issue.severity == "breaking")
        potentially_breaking_count = sum(1 for issue in issues if issue.severity == "potentially_breaking")
        non_breaking_count = sum(1 for issue in issues if issue.severity == "non_breaking")
        additive_count = sum(1 for issue in issues if issue.type == "additive")
        
        # Determine overall compatibility
        if breaking_count > 0:
            overall_compatibility = "breaking_changes"
        elif potentially_breaking_count > 0:
            overall_compatibility = "potentially_incompatible"
        elif non_breaking_count > 0:
            overall_compatibility = "backward_compatible"
        else:
            overall_compatibility = "fully_compatible"
        
        # Generate risk assessment
        risk_assessment = {
            "overall_risk": "high" if breaking_count > 0 else "medium" if potentially_breaking_count > 0 else "low",
            "deployment_risk": "requires_coordinated_deployment" if breaking_count > 0 else "safe_independent_deployment",
            "rollback_complexity": "high" if breaking_count > 3 else "medium" if breaking_count > 0 else "low",
            "testing_requirements": ["integration_testing", "regression_testing"] + 
                                  (["data_migration_testing"] if any(s.script_type == "sql" for s in migration_scripts) else [])
        }
        
        # Generate recommendations
        recommendations = []
        if breaking_count > 0:
            recommendations.append("Implement API versioning to maintain backward compatibility")
            recommendations.append("Plan for coordinated deployment with all clients")
            recommendations.append("Implement comprehensive rollback procedures")
        
        if potentially_breaking_count > 0:
            recommendations.append("Conduct thorough testing with realistic data volumes")
            recommendations.append("Implement monitoring for migration success metrics")
        
        if migration_scripts:
            recommendations.append("Test all migration scripts in staging environment")
            recommendations.append("Implement migration progress monitoring")
        
        recommendations.append("Create detailed communication plan for stakeholders")
        recommendations.append("Implement feature flags for gradual rollout")
        
        return CompatibilityReport(
            schema_before=json.dumps(before_schema, indent=2)[:500] + "..." if len(json.dumps(before_schema)) > 500 else json.dumps(before_schema, indent=2),
            schema_after=json.dumps(after_schema, indent=2)[:500] + "..." if len(json.dumps(after_schema)) > 500 else json.dumps(after_schema, indent=2),
            analysis_date=datetime.datetime.now().isoformat(),
            overall_compatibility=overall_compatibility,
            breaking_changes_count=breaking_count,
            potentially_breaking_count=potentially_breaking_count,
            non_breaking_changes_count=non_breaking_count,
            additive_changes_count=additive_count,
            issues=issues,
            migration_scripts=migration_scripts,
            risk_assessment=risk_assessment,
            recommendations=recommendations
        )
    
    def _generate_create_table_sql(self, table_name: str, table_def: Dict[str, Any]) -> str:
        """Generate CREATE TABLE SQL statement"""
        columns = []
        for col_name, col_def in table_def.get("columns", {}).items():
            columns.append(self._generate_column_definition(col_name, col_def))
        
        return f"CREATE TABLE {table_name} (\n  " + ",\n  ".join(columns) + "\n);"
    
    def _generate_column_definition(self, col_name: str, col_def: Dict[str, Any]) -> str:
        """Generate column definition for SQL"""
        col_type = col_def.get("type", "VARCHAR(255)")
        nullable = "" if col_def.get("nullable", True) else " NOT NULL"
        default = f" DEFAULT {col_def.get('default')}" if col_def.get("default") is not None else ""
        
        return f"{col_name} {col_type}{nullable}{default}"
    
    def generate_human_readable_report(self, report: CompatibilityReport) -> str:
        """Generate human-readable compatibility report"""
        output = []
        output.append("=" * 80)
        output.append("COMPATIBILITY ANALYSIS REPORT")
        output.append("=" * 80)
        output.append(f"Analysis Date: {report.analysis_date}")
        output.append(f"Overall Compatibility: {report.overall_compatibility.upper()}")
        output.append("")
        
        # Summary
        output.append("SUMMARY")
        output.append("-" * 40)
        output.append(f"Breaking Changes: {report.breaking_changes_count}")
        output.append(f"Potentially Breaking: {report.potentially_breaking_count}")
        output.append(f"Non-Breaking Changes: {report.non_breaking_changes_count}")
        output.append(f"Additive Changes: {report.additive_changes_count}")
        output.append(f"Total Issues Found: {len(report.issues)}")
        output.append("")
        
        # Risk Assessment
        output.append("RISK ASSESSMENT")
        output.append("-" * 40)
        for key, value in report.risk_assessment.items():
            output.append(f"{key.replace('_', ' ').title()}: {value}")
        output.append("")
        
        # Issues by Severity
        issues_by_severity = {}
        for issue in report.issues:
            if issue.severity not in issues_by_severity:
                issues_by_severity[issue.severity] = []
            issues_by_severity[issue.severity].append(issue)
        
        for severity in ["breaking", "potentially_breaking", "non_breaking"]:
            if severity in issues_by_severity:
                output.append(f"{severity.upper().replace('_', ' ')} ISSUES")
                output.append("-" * 40)
                for issue in issues_by_severity[severity]:
                    output.append(f"• {issue.description}")
                    output.append(f"  Field: {issue.field_path}")
                    output.append(f"  Impact: {issue.impact}")
                    output.append(f"  Migration: {issue.suggested_migration}")
                    if issue.affected_operations:
                        output.append(f"  Affected Operations: {', '.join(issue.affected_operations)}")
                    output.append("")
        
        # Migration Scripts
        if report.migration_scripts:
            output.append("SUGGESTED MIGRATION SCRIPTS")
            output.append("-" * 40)
            for i, script in enumerate(report.migration_scripts, 1):
                output.append(f"{i}. {script.description}")
                output.append(f"   Type: {script.script_type}")
                output.append("   Script:")
                for line in script.script_content.split('\n'):
                    output.append(f"     {line}")
                output.append("")
        
        # Recommendations
        output.append("RECOMMENDATIONS")
        output.append("-" * 40)
        for i, rec in enumerate(report.recommendations, 1):
            output.append(f"{i}. {rec}")
        output.append("")
        
        return "\n".join(output)


def main():
    """Main function with command line interface"""
    parser = argparse.ArgumentParser(description="Analyze schema and API compatibility between versions")
    parser.add_argument("--before", required=True, help="Before schema file (JSON)")
    parser.add_argument("--after", required=True, help="After schema file (JSON)")
    parser.add_argument("--type", choices=["database", "api"], default="database", help="Schema type to analyze")
    parser.add_argument("--output", "-o", help="Output file for compatibility report (JSON)")
    parser.add_argument("--format", "-f", choices=["json", "text", "both"], default="both", help="Output format")
    
    args = parser.parse_args()
    
    try:
        # Load schemas
        with open(args.before, 'r') as f:
            before_schema = json.load(f)
        
        with open(args.after, 'r') as f:
            after_schema = json.load(f)
        
        # Analyze compatibility
        checker = SchemaCompatibilityChecker()
        
        if args.type == "database":
            report = checker.analyze_database_schema(before_schema, after_schema)
        else:  # api
            report = checker.analyze_api_schema(before_schema, after_schema)
        
        # Output results
        if args.format in ["json", "both"]:
            report_dict = asdict(report)
            if args.output:
                with open(args.output, 'w') as f:
                    json.dump(report_dict, f, indent=2)
                print(f"Compatibility report saved to {args.output}")
            else:
                print(json.dumps(report_dict, indent=2))
        
        if args.format in ["text", "both"]:
            human_report = checker.generate_human_readable_report(report)
            text_output = args.output.replace('.json', '.txt') if args.output else None
            if text_output:
                with open(text_output, 'w') as f:
                    f.write(human_report)
                print(f"Human-readable report saved to {text_output}")
            else:
                print("\n" + "="*80)
                print("HUMAN-READABLE COMPATIBILITY REPORT")
                print("="*80)
                print(human_report)
        
        # Return exit code based on compatibility
        if report.breaking_changes_count > 0:
            return 2  # Breaking changes found
        elif report.potentially_breaking_count > 0:
            return 1  # Potentially breaking changes found
        else:
            return 0  # No compatibility issues
            
    except FileNotFoundError as e:
        print(f"Error: File not found: {e}", file=sys.stderr)
        return 1
    except json.JSONDecodeError as e:
        print(f"Error: Invalid JSON: {e}", file=sys.stderr)
        return 1
    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        return 1


if __name__ == "__main__":
    sys.exit(main())
```

### scripts/migration_planner.py

```python
#!/usr/bin/env python3
"""
Migration Planner - Generate comprehensive migration plans with risk assessment

This tool analyzes migration specifications and generates detailed, phased migration plans
including pre-migration checklists, validation gates, rollback triggers, timeline estimates,
and risk matrices.

Author: Migration Architect Skill
Version: 1.0.0
License: MIT
"""

import json
import argparse
import sys
import datetime
import hashlib
import math
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum


class MigrationType(Enum):
    """Migration type enumeration"""
    DATABASE = "database"
    SERVICE = "service"
    INFRASTRUCTURE = "infrastructure"
    DATA = "data"
    API = "api"


class MigrationComplexity(Enum):
    """Migration complexity levels"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"


class RiskLevel(Enum):
    """Risk assessment levels"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"


@dataclass
class MigrationConstraint:
    """Migration constraint definition"""
    type: str
    description: str
    impact: str
    mitigation: str


@dataclass
class MigrationPhase:
    """Individual migration phase"""
    name: str
    description: str
    duration_hours: int
    dependencies: List[str]
    validation_criteria: List[str]
    rollback_triggers: List[str]
    tasks: List[str]
    risk_level: str
    resources_required: List[str]


@dataclass
class RiskItem:
    """Individual risk assessment item"""
    category: str
    description: str
    probability: str  # low, medium, high
    impact: str  # low, medium, high
    severity: str  # low, medium, high, critical
    mitigation: str
    owner: str


@dataclass
class MigrationPlan:
    """Complete migration plan structure"""
    migration_id: str
    source_system: str
    target_system: str
    migration_type: str
    complexity: str
    estimated_duration_hours: int
    phases: List[MigrationPhase]
    risks: List[RiskItem]
    success_criteria: List[str]
    rollback_plan: Dict[str, Any]
    stakeholders: List[str]
    created_at: str


class MigrationPlanner:
    """Main migration planner class"""
    
    def __init__(self):
        self.migration_patterns = self._load_migration_patterns()
        self.risk_templates = self._load_risk_templates()
        
    def _load_migration_patterns(self) -> Dict[str, Any]:
        """Load predefined migration patterns"""
        return {
            "database": {
                "schema_change": {
                    "phases": ["preparation", "expand", "migrate", "contract", "cleanup"],
                    "base_duration": 24,
                    "complexity_multiplier": {"low": 1.0, "medium": 1.5, "high": 2.5, "critical": 4.0}
                },
                "data_migration": {
                    "phases": ["assessment", "setup", "bulk_copy", "delta_sync", "validation", "cutover"],
                    "base_duration": 48,
                    "complexity_multiplier": {"low": 1.2, "medium": 2.0, "high": 3.0, "critical": 5.0}
                }
            },
            "service": {
                "strangler_fig": {
                    "phases": ["intercept", "implement", "redirect", "validate", "retire"],
                    "base_duration": 168,  # 1 week
                    "complexity_multiplier": {"low": 0.8, "medium": 1.0, "high": 1.8, "critical": 3.0}
                },
                "parallel_run": {
                    "phases": ["setup", "deploy", "shadow", "compare", "cutover", "cleanup"],
                    "base_duration": 72,
                    "complexity_multiplier": {"low": 1.0, "medium": 1.3, "high": 2.0, "critical": 3.5}
                }
            },
            "infrastructure": {
                "cloud_migration": {
                    "phases": ["assessment", "design", "pilot", "migration", "optimization", "decommission"],
                    "base_duration": 720,  # 30 days
                    "complexity_multiplier": {"low": 0.6, "medium": 1.0, "high": 1.5, "critical": 2.5}
                },
                "on_prem_to_cloud": {
                    "phases": ["discovery", "planning", "pilot", "migration", "validation", "cutover"],
                    "base_duration": 480,  # 20 days
                    "complexity_multiplier": {"low": 0.8, "medium": 1.2, "high": 2.0, "critical": 3.0}
                }
            }
        }
    
    def _load_risk_templates(self) -> Dict[str, List[RiskItem]]:
        """Load risk templates for different migration types"""
        return {
            "database": [
                RiskItem("technical", "Data corruption during migration", "low", "critical", "high",
                        "Implement comprehensive backup and validation procedures", "DBA Team"),
                RiskItem("technical", "Extended downtime due to migration complexity", "medium", "high", "high",
                        "Use blue-green deployment and phased migration approach", "DevOps Team"),
                RiskItem("business", "Business process disruption", "medium", "high", "high",
                        "Communicate timeline and provide alternate workflows", "Business Owner"),
                RiskItem("operational", "Insufficient rollback testing", "high", "critical", "critical",
                        "Execute full rollback procedures in staging environment", "QA Team")
            ],
            "service": [
                RiskItem("technical", "Service compatibility issues", "medium", "high", "high",
                        "Implement comprehensive integration testing", "Development Team"),
                RiskItem("technical", "Performance degradation", "medium", "medium", "medium",
                        "Conduct load testing and performance benchmarking", "DevOps Team"),
                RiskItem("business", "Feature parity gaps", "high", "high", "high",
                        "Document feature mapping and acceptance criteria", "Product Owner"),
                RiskItem("operational", "Monitoring gap during transition", "medium", "medium", "medium",
                        "Set up dual monitoring and alerting systems", "SRE Team")
            ],
            "infrastructure": [
                RiskItem("technical", "Network connectivity issues", "medium", "critical", "high",
                        "Implement redundant network paths and monitoring", "Network Team"),
                RiskItem("technical", "Security configuration drift", "high", "high", "high",
                        "Automated security scanning and compliance checks", "Security Team"),
                RiskItem("business", "Cost overrun during transition", "high", "medium", "medium",
                        "Implement cost monitoring and budget alerts", "Finance Team"),
                RiskItem("operational", "Team knowledge gaps", "high", "medium", "medium",
                        "Provide training and create detailed documentation", "Platform Team")
            ]
        }
    
    def _calculate_complexity(self, spec: Dict[str, Any]) -> str:
        """Calculate migration complexity based on specification"""
        complexity_score = 0
        
        # Data volume complexity
        data_volume = spec.get("constraints", {}).get("data_volume_gb", 0)
        if data_volume > 10000:
            complexity_score += 3
        elif data_volume > 1000:
            complexity_score += 2
        elif data_volume > 100:
            complexity_score += 1
        
        # System dependencies
        dependencies = len(spec.get("constraints", {}).get("dependencies", []))
        if dependencies > 10:
            complexity_score += 3
        elif dependencies > 5:
            complexity_score += 2
        elif dependencies > 2:
            complexity_score += 1
        
        # Downtime constraints
        max_downtime = spec.get("constraints", {}).get("max_downtime_minutes", 480)
        if max_downtime < 60:
            complexity_score += 3
        elif max_downtime < 240:
            complexity_score += 2
        elif max_downtime < 480:
            complexity_score += 1
        
        # Special requirements
        special_reqs = spec.get("constraints", {}).get("special_requirements", [])
        complexity_score += len(special_reqs)
        
        if complexity_score >= 8:
            return "critical"
        elif complexity_score >= 5:
            return "high"
        elif complexity_score >= 3:
            return "medium"
        else:
            return "low"
    
    def _estimate_duration(self, migration_type: str, migration_pattern: str, complexity: str) -> int:
        """Estimate migration duration based on type, pattern, and complexity"""
        pattern_info = self.migration_patterns.get(migration_type, {}).get(migration_pattern, {})
        base_duration = pattern_info.get("base_duration", 48)
        multiplier = pattern_info.get("complexity_multiplier", {}).get(complexity, 1.5)
        
        return int(base_duration * multiplier)
    
    def _generate_phases(self, spec: Dict[str, Any]) -> List[MigrationPhase]:
        """Generate migration phases based on specification"""
        migration_type = spec.get("type")
        migration_pattern = spec.get("pattern", "")
        complexity = self._calculate_complexity(spec)
        
        pattern_info = self.migration_patterns.get(migration_type, {})
        if migration_pattern in pattern_info:
            phase_names = pattern_info[migration_pattern]["phases"]
        else:
            # Default phases based on migration type
            phase_names = {
                "database": ["preparation", "migration", "validation", "cutover"],
                "service": ["preparation", "deployment", "testing", "cutover"],
                "infrastructure": ["assessment", "preparation", "migration", "validation"]
            }.get(migration_type, ["preparation", "execution", "validation", "cleanup"])
        
        phases = []
        total_duration = self._estimate_duration(migration_type, migration_pattern, complexity)
        phase_duration = total_duration // len(phase_names)
        
        for i, phase_name in enumerate(phase_names):
            phase = self._create_phase(phase_name, phase_duration, complexity, i, phase_names)
            phases.append(phase)
        
        return phases
    
    def _create_phase(self, phase_name: str, duration: int, complexity: str, 
                     phase_index: int, all_phases: List[str]) -> MigrationPhase:
        """Create a detailed migration phase"""
        phase_templates = {
            "preparation": {
                "description": "Prepare systems and teams for migration",
                "tasks": [
                    "Backup source system",
                    "Set up monitoring and alerting",
                    "Prepare rollback procedures",
                    "Communicate migration timeline",
                    "Validate prerequisites"
                ],
                "validation_criteria": [
                    "All backups completed successfully",
                    "Monitoring systems operational",
                    "Team members briefed and ready",
                    "Rollback procedures tested"
                ],
                "risk_level": "medium"
            },
            "assessment": {
                "description": "Assess current state and migration requirements",
                "tasks": [
                    "Inventory existing systems and dependencies",
                    "Analyze data volumes and complexity",
                    "Identify integration points",
                    "Document current architecture",
                    "Create migration mapping"
                ],
                "validation_criteria": [
                    "Complete system inventory documented",
                    "Dependencies mapped and validated",
                    "Migration scope clearly defined",
                    "Resource requirements identified"
                ],
                "risk_level": "low"
            },
            "migration": {
                "description": "Execute core migration processes",
                "tasks": [
                    "Begin data/service migration",
                    "Monitor migration progress",
                    "Validate data consistency",
                    "Handle migration errors",
                    "Update configuration"
                ],
                "validation_criteria": [
                    "Migration progress within expected parameters",
                    "Data consistency checks passing",
                    "Error rates within acceptable limits",
                    "Performance metrics stable"
                ],
                "risk_level": "high"
            },
            "validation": {
                "description": "Validate migration success and system health",
                "tasks": [
                    "Execute comprehensive testing",
                    "Validate business processes",
                    "Check system performance",
                    "Verify data integrity",
                    "Confirm security controls"
                ],
                "validation_criteria": [
                    "All critical tests passing",
                    "Performance within acceptable range",
                    "Security controls functioning",
                    "Business processes operational"
                ],
                "risk_level": "medium"
            },
            "cutover": {
                "description": "Switch production traffic to new system",
                "tasks": [
                    "Update DNS/load balancer configuration",
                    "Redirect production traffic",
                    "Monitor system performance",
                    "Validate end-user experience",
                    "Confirm business operations"
                ],
                "validation_criteria": [
                    "Traffic successfully redirected",
                    "System performance stable",
                    "User experience satisfactory",
                    "Business operations normal"
                ],
                "risk_level": "critical"
            }
        }
        
        template = phase_templates.get(phase_name, {
            "description": f"Execute {phase_name} phase",
            "tasks": [f"Complete {phase_name} activities"],
            "validation_criteria": [f"{phase_name.title()} phase completed successfully"],
            "risk_level": "medium"
        })
        
        dependencies = []
        if phase_index > 0:
            dependencies.append(all_phases[phase_index - 1])
        
        rollback_triggers = [
            "Critical system failure",
            "Data corruption detected",
            "Performance degradation > 50%",
            "Business process failure"
        ]
        
        resources_required = [
            "Technical team availability",
            "System access and permissions",
            "Monitoring and alerting systems",
            "Communication channels"
        ]
        
        return MigrationPhase(
            name=phase_name,
            description=template["description"],
            duration_hours=duration,
            dependencies=dependencies,
            validation_criteria=template["validation_criteria"],
            rollback_triggers=rollback_triggers,
            tasks=template["tasks"],
            risk_level=template["risk_level"],
            resources_required=resources_required
        )
    
    def _assess_risks(self, spec: Dict[str, Any]) -> List[RiskItem]:
        """Generate risk assessment for migration"""
        migration_type = spec.get("type")
        base_risks = self.risk_templates.get(migration_type, [])
        
        # Add specification-specific risks
        additional_risks = []
        constraints = spec.get("constraints", {})
        
        if constraints.get("max_downtime_minutes", 480) < 60:
            additional_risks.append(
                RiskItem("business", "Zero-downtime requirement increases complexity", "high", "medium", "high",
                        "Implement blue-green deployment or rolling update strategy", "DevOps Team")
            )
        
        if constraints.get("data_volume_gb", 0) > 5000:
            additional_risks.append(
                RiskItem("technical", "Large data volumes may cause extended migration time", "high", "medium", "medium",
                        "Implement parallel processing and progress monitoring", "Data Team")
            )
        
        compliance_reqs = constraints.get("compliance_requirements", [])
        if compliance_reqs:
            additional_risks.append(
                RiskItem("compliance", "Regulatory compliance requirements", "medium", "high", "high",
                        "Ensure all compliance checks are integrated into migration process", "Compliance Team")
            )
        
        return base_risks + additional_risks
    
    def _generate_rollback_plan(self, phases: List[MigrationPhase]) -> Dict[str, Any]:
        """Generate comprehensive rollback plan"""
        rollback_phases = []
        
        for phase in reversed(phases):
            rollback_phase = {
                "phase": phase.name,
                "rollback_actions": [
                    f"Revert {phase.name} changes",
                    f"Restore pre-{phase.name} state",
                    f"Validate {phase.name} rollback success"
                ],
                "validation_criteria": [
                    f"System restored to pre-{phase.name} state",
                    f"All {phase.name} changes successfully reverted",
                    "System functionality confirmed"
                ],
                "estimated_time_minutes": phase.duration_hours * 15  # 25% of original phase time
            }
            rollback_phases.append(rollback_phase)
        
        return {
            "rollback_phases": rollback_phases,
            "rollback_triggers": [
                "Critical system failure",
                "Data corruption detected",
                "Migration timeline exceeded by > 50%",
                "Business-critical functionality unavailable",
                "Security breach detected",
                "Stakeholder decision to abort"
            ],
            "rollback_decision_matrix": {
                "low_severity": "Continue with monitoring",
                "medium_severity": "Assess and decide within 15 minutes",
                "high_severity": "Immediate rollback initiation",
                "critical_severity": "Emergency rollback - all hands"
            },
            "rollback_contacts": [
                "Migration Lead",
                "Technical Lead", 
                "Business Owner",
                "On-call Engineer"
            ]
        }
    
    def generate_plan(self, spec: Dict[str, Any]) -> MigrationPlan:
        """Generate complete migration plan from specification"""
        migration_id = hashlib.md5(json.dumps(spec, sort_keys=True).encode()).hexdigest()[:12]
        complexity = self._calculate_complexity(spec)
        phases = self._generate_phases(spec)
        risks = self._assess_risks(spec)
        total_duration = sum(phase.duration_hours for phase in phases)
        rollback_plan = self._generate_rollback_plan(phases)
        
        success_criteria = [
            "All data successfully migrated with 100% integrity",
            "System performance meets or exceeds baseline",
            "All business processes functioning normally",
            "No critical security vulnerabilities introduced",
            "Stakeholder acceptance criteria met",
            "Documentation and runbooks updated"
        ]
        
        stakeholders = [
            "Business Owner",
            "Technical Lead",
            "DevOps Team",
            "QA Team", 
            "Security Team",
            "End Users"
        ]
        
        return MigrationPlan(
            migration_id=migration_id,
            source_system=spec.get("source", "Unknown"),
            target_system=spec.get("target", "Unknown"),
            migration_type=spec.get("type", "Unknown"),
            complexity=complexity,
            estimated_duration_hours=total_duration,
            phases=phases,
            risks=risks,
            success_criteria=success_criteria,
            rollback_plan=rollback_plan,
            stakeholders=stakeholders,
            created_at=datetime.datetime.now().isoformat()
        )
    
    def generate_human_readable_plan(self, plan: MigrationPlan) -> str:
        """Generate human-readable migration plan"""
        output = []
        output.append("=" * 80)
        output.append(f"MIGRATION PLAN: {plan.migration_id}")
        output.append("=" * 80)
        output.append(f"Source System: {plan.source_system}")
        output.append(f"Target System: {plan.target_system}")
        output.append(f"Migration Type: {plan.migration_type.upper()}")
        output.append(f"Complexity Level: {plan.complexity.upper()}")
        output.append(f"Estimated Duration: {plan.estimated_duration_hours} hours ({plan.estimated_duration_hours/24:.1f} days)")
        output.append(f"Created: {plan.created_at}")
        output.append("")
        
        # Phases
        output.append("MIGRATION PHASES")
        output.append("-" * 40)
        for i, phase in enumerate(plan.phases, 1):
            output.append(f"{i}. {phase.name.upper()} ({phase.duration_hours}h)")
            output.append(f"   Description: {phase.description}")
            output.append(f"   Risk Level: {phase.risk_level.upper()}")
            if phase.dependencies:
                output.append(f"   Dependencies: {', '.join(phase.dependencies)}")
            output.append("   Tasks:")
            for task in phase.tasks:
                output.append(f"     • {task}")
            output.append("   Success Criteria:")
            for criteria in phase.validation_criteria:
                output.append(f"     ✓ {criteria}")
            output.append("")
        
        # Risk Assessment
        output.append("RISK ASSESSMENT")
        output.append("-" * 40)
        risk_by_severity = {}
        for risk in plan.risks:
            if risk.severity not in risk_by_severity:
                risk_by_severity[risk.severity] = []
            risk_by_severity[risk.severity].append(risk)
        
        for severity in ["critical", "high", "medium", "low"]:
            if severity in risk_by_severity:
                output.append(f"{severity.upper()} SEVERITY RISKS:")
                for risk in risk_by_severity[severity]:
                    output.append(f"  • {risk.description}")
                    output.append(f"    Category: {risk.category}")
                    output.append(f"    Probability: {risk.probability} | Impact: {risk.impact}")
                    output.append(f"    Mitigation: {risk.mitigation}")
                    output.append(f"    Owner: {risk.owner}")
                    output.append("")
        
        # Rollback Plan
        output.append("ROLLBACK STRATEGY")
        output.append("-" * 40)
        output.append("Rollback Triggers:")
        for trigger in plan.rollback_plan["rollback_triggers"]:
            output.append(f"  • {trigger}")
        output.append("")
        
        output.append("Rollback Phases:")
        for rb_phase in plan.rollback_plan["rollback_phases"]:
            output.append(f"  {rb_phase['phase'].upper()}:")
            for action in rb_phase["rollback_actions"]:
                output.append(f"    - {action}")
            output.append(f"    Estimated Time: {rb_phase['estimated_time_minutes']} minutes")
            output.append("")
        
        # Success Criteria
        output.append("SUCCESS CRITERIA")
        output.append("-" * 40)
        for criteria in plan.success_criteria:
            output.append(f"✓ {criteria}")
        output.append("")
        
        # Stakeholders
        output.append("STAKEHOLDERS")
        output.append("-" * 40)
        for stakeholder in plan.stakeholders:
            output.append(f"• {stakeholder}")
        output.append("")
        
        return "\n".join(output)


def main():
    """Main function with command line interface"""
    parser = argparse.ArgumentParser(description="Generate comprehensive migration plans")
    parser.add_argument("--input", "-i", required=True, help="Input migration specification file (JSON)")
    parser.add_argument("--output", "-o", help="Output file for migration plan (JSON)")
    parser.add_argument("--format", "-f", choices=["json", "text", "both"], default="both",
                       help="Output format")
    parser.add_argument("--validate", action="store_true", help="Validate migration specification only")
    
    args = parser.parse_args()
    
    try:
        # Load migration specification
        with open(args.input, 'r') as f:
            spec = json.load(f)
        
        # Validate required fields
        required_fields = ["type", "source", "target"]
        for field in required_fields:
            if field not in spec:
                print(f"Error: Missing required field '{field}' in specification", file=sys.stderr)
                return 1
        
        if args.validate:
            print("Migration specification is valid")
            return 0
        
        # Generate migration plan
        planner = MigrationPlanner()
        plan = planner.generate_plan(spec)
        
        # Output results
        if args.format in ["json", "both"]:
            plan_dict = asdict(plan)
            if args.output:
                with open(args.output, 'w') as f:
                    json.dump(plan_dict, f, indent=2)
                print(f"Migration plan saved to {args.output}")
            else:
                print(json.dumps(plan_dict, indent=2))
        
        if args.format in ["text", "both"]:
            human_plan = planner.generate_human_readable_plan(plan)
            text_output = args.output.replace('.json', '.txt') if args.output else None
            if text_output:
                with open(text_output, 'w') as f:
                    f.write(human_plan)
                print(f"Human-readable plan saved to {text_output}")
            else:
                print("\n" + "="*80)
                print("HUMAN-READABLE MIGRATION PLAN")
                print("="*80)
                print(human_plan)
        
    except FileNotFoundError:
        print(f"Error: Input file '{args.input}' not found", file=sys.stderr)
        return 1
    except json.JSONDecodeError as e:
        print(f"Error: Invalid JSON in input file: {e}", file=sys.stderr)
        return 1
    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        return 1
    
    return 0


if __name__ == "__main__":
    sys.exit(main())
```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### README.md

```markdown
# Migration Architect

**Tier:** POWERFUL  
**Category:** Engineering - Migration Strategy  
**Purpose:** Zero-downtime migration planning, compatibility validation, and rollback strategy generation

## Overview

The Migration Architect skill provides comprehensive tools and methodologies for planning, executing, and validating complex system migrations with minimal business impact. This skill combines proven migration patterns with automated planning tools to ensure successful transitions between systems, databases, and infrastructure.

## Components

### Core Scripts

1. **migration_planner.py** - Automated migration plan generation
2. **compatibility_checker.py** - Schema and API compatibility analysis  
3. **rollback_generator.py** - Comprehensive rollback procedure generation

### Reference Documentation

- **migration_patterns_catalog.md** - Detailed catalog of proven migration patterns
- **zero_downtime_techniques.md** - Comprehensive zero-downtime migration techniques
- **data_reconciliation_strategies.md** - Advanced data consistency and reconciliation strategies

### Sample Assets

- **sample_database_migration.json** - Example database migration specification
- **sample_service_migration.json** - Example service migration specification
- **database_schema_before.json** - Sample "before" database schema
- **database_schema_after.json** - Sample "after" database schema

## Quick Start

### 1. Generate a Migration Plan

```bash
python3 scripts/migration_planner.py \
  --input assets/sample_database_migration.json \
  --output migration_plan.json \
  --format both
```

**Input:** Migration specification with source, target, constraints, and requirements
**Output:** Detailed phased migration plan with risk assessment, timeline, and validation gates

### 2. Check Compatibility

```bash
python3 scripts/compatibility_checker.py \
  --before assets/database_schema_before.json \
  --after assets/database_schema_after.json \
  --type database \
  --output compatibility_report.json \
  --format both
```

**Input:** Before and after schema definitions
**Output:** Compatibility report with breaking changes, migration scripts, and recommendations

### 3. Generate Rollback Procedures

```bash
python3 scripts/rollback_generator.py \
  --input migration_plan.json \
  --output rollback_runbook.json \
  --format both
```

**Input:** Migration plan from step 1
**Output:** Comprehensive rollback runbook with procedures, triggers, and communication templates

## Script Details

### Migration Planner (`migration_planner.py`)

Generates comprehensive migration plans with:

- **Phased approach** with dependencies and validation gates
- **Risk assessment** with mitigation strategies
- **Timeline estimation** based on complexity and constraints
- **Rollback triggers** and success criteria
- **Stakeholder communication** templates

**Usage:**
```bash
python3 scripts/migration_planner.py [OPTIONS]

Options:
  --input, -i     Input migration specification file (JSON) [required]
  --output, -o    Output file for migration plan (JSON)
  --format, -f    Output format: json, text, both (default: both)
  --validate      Validate migration specification only
```

**Input Format:**
```json
{
  "type": "database|service|infrastructure",
  "pattern": "schema_change|strangler_fig|blue_green",
  "source": "Source system description",
  "target": "Target system description", 
  "constraints": {
    "max_downtime_minutes": 30,
    "data_volume_gb": 2500,
    "dependencies": ["service1", "service2"],
    "compliance_requirements": ["GDPR", "SOX"]
  }
}
```

### Compatibility Checker (`compatibility_checker.py`)

Analyzes compatibility between schema versions:

- **Breaking change detection** (removed fields, type changes, constraint additions)
- **Data migration requirements** identification
- **Suggested migration scripts** generation
- **Risk assessment** for each change

**Usage:**
```bash
python3 scripts/compatibility_checker.py [OPTIONS]

Options:
  --before        Before schema file (JSON) [required]
  --after         After schema file (JSON) [required]
  --type          Schema type: database, api (default: database)
  --output, -o    Output file for compatibility report (JSON)
  --format, -f    Output format: json, text, both (default: both)
```

**Exit Codes:**
- `0`: No compatibility issues
- `1`: Potentially breaking changes found
- `2`: Breaking changes found

### Rollback Generator (`rollback_generator.py`)

Creates comprehensive rollback procedures:

- **Phase-by-phase rollback** steps
- **Automated trigger conditions** for rollback
- **Data recovery procedures** 
- **Communication templates** for different audiences
- **Validation checklists** for rollback success

**Usage:**
```bash
python3 scripts/rollback_generator.py [OPTIONS]

Options:
  --input, -i     Input migration plan file (JSON) [required]
  --output, -o    Output file for rollback runbook (JSON)
  --format, -f    Output format: json, text, both (default: both)
```

## Migration Patterns Supported

### Database Migrations

- **Expand-Contract Pattern** - Zero-downtime schema evolution
- **Parallel Schema Pattern** - Side-by-side schema migration
- **Event Sourcing Migration** - Event-driven data migration

### Service Migrations

- **Strangler Fig Pattern** - Gradual legacy system replacement
- **Parallel Run Pattern** - Risk mitigation through dual execution
- **Blue-Green Deployment** - Zero-downtime service updates

### Infrastructure Migrations

- **Lift and Shift** - Quick cloud migration with minimal changes
- **Hybrid Cloud Migration** - Gradual cloud adoption
- **Multi-Cloud Migration** - Distribution across multiple providers

## Sample Workflow

### 1. Database Schema Migration

```bash
# Generate migration plan
python3 scripts/migration_planner.py \
  --input assets/sample_database_migration.json \
  --output db_migration_plan.json

# Check schema compatibility
python3 scripts/compatibility_checker.py \
  --before assets/database_schema_before.json \
  --after assets/database_schema_after.json \
  --type database \
  --output schema_compatibility.json

# Generate rollback procedures
python3 scripts/rollback_generator.py \
  --input db_migration_plan.json \
  --output db_rollback_runbook.json
```

### 2. Service Migration

```bash
# Generate service migration plan
python3 scripts/migration_planner.py \
  --input assets/sample_service_migration.json \
  --output service_migration_plan.json

# Generate rollback procedures
python3 scripts/rollback_generator.py \
  --input service_migration_plan.json \
  --output service_rollback_runbook.json
```

## Output Examples

### Migration Plan Structure

```json
{
  "migration_id": "abc123def456",
  "source_system": "Legacy User Service",
  "target_system": "New User Service",
  "migration_type": "service",
  "complexity": "medium",
  "estimated_duration_hours": 72,
  "phases": [
    {
      "name": "preparation",
      "description": "Prepare systems and teams for migration",
      "duration_hours": 8,
      "validation_criteria": ["All backups completed successfully"],
      "rollback_triggers": ["Critical system failure"],
      "risk_level": "medium"
    }
  ],
  "risks": [
    {
      "category": "technical",
      "description": "Service compatibility issues",
      "severity": "high",
      "mitigation": "Comprehensive integration testing"
    }
  ]
}
```

### Compatibility Report Structure

```json
{
  "overall_compatibility": "potentially_incompatible",
  "breaking_changes_count": 2,
  "potentially_breaking_count": 3,
  "issues": [
    {
      "type": "required_column_added", 
      "severity": "breaking",
      "description": "Required column 'email_verified_at' added",
      "suggested_migration": "Add default value initially"
    }
  ],
  "migration_scripts": [
    {
      "script_type": "sql",
      "description": "Add email verification columns",
      "script_content": "ALTER TABLE users ADD COLUMN email_verified_at TIMESTAMP;",
      "rollback_script": "ALTER TABLE users DROP COLUMN email_verified_at;"
    }
  ]
}
```

## Best Practices

### Planning Phase
1. **Start with risk assessment** - Identify failure modes before planning
2. **Design for rollback** - Every step should have a tested rollback procedure
3. **Validate in staging** - Execute full migration in production-like environment
4. **Plan gradual rollout** - Use feature flags and traffic routing

### Execution Phase
1. **Monitor continuously** - Track technical and business metrics
2. **Communicate proactively** - Keep stakeholders informed
3. **Document everything** - Maintain detailed logs for analysis
4. **Stay flexible** - Be prepared to adjust based on real-world performance

### Validation Phase
1. **Automate validation** - Use automated consistency and performance checks
2. **Test business logic** - Validate critical business processes end-to-end
3. **Load test** - Verify performance under expected production load
4. **Security validation** - Ensure security controls function properly

## Integration

### CI/CD Pipeline Integration

```yaml
# Example GitHub Actions workflow
name: Migration Validation
on: [push, pull_request]

jobs:
  validate-migration:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: Validate Migration Plan
        run: |
          python3 scripts/migration_planner.py \
            --input migration_spec.json \
            --validate
      - name: Check Compatibility
        run: |
          python3 scripts/compatibility_checker.py \
            --before schema_before.json \
            --after schema_after.json \
            --type database
```

### Monitoring Integration

The tools generate metrics and alerts that can be integrated with:
- **Prometheus** - For metrics collection
- **Grafana** - For visualization and dashboards
- **PagerDuty** - For incident management
- **Slack** - For team notifications

## Advanced Features

### Machine Learning Integration
- Anomaly detection for data consistency issues
- Predictive analysis for migration success probability
- Automated pattern recognition for migration optimization

### Performance Optimization
- Parallel processing for large-scale migrations
- Incremental reconciliation strategies
- Statistical sampling for validation

### Compliance Support
- GDPR compliance tracking
- SOX audit trail generation
- HIPAA security validation

## Troubleshooting

### Common Issues

**"Migration plan validation failed"**
- Check JSON syntax in migration specification
- Ensure all required fields are present
- Validate constraint values are realistic

**"Compatibility checker reports false positives"**
- Review excluded fields configuration
- Check data type mapping compatibility
- Adjust tolerance settings for numerical comparisons

**"Rollback procedures seem incomplete"**
- Ensure migration plan includes all phases
- Verify database backup locations are specified
- Check that all dependencies are documented

### Getting Help

1. **Review documentation** - Check reference docs for patterns and techniques
2. **Examine sample files** - Use provided assets as templates
3. **Check expected outputs** - Compare your results with sample outputs
4. **Validate inputs** - Ensure input files match expected format

## Contributing

To extend or modify the Migration Architect skill:

1. **Add new patterns** - Extend pattern templates in migration_planner.py
2. **Enhance compatibility checks** - Add new validation rules in compatibility_checker.py
3. **Improve rollback procedures** - Add specialized rollback steps in rollback_generator.py
4. **Update documentation** - Keep reference docs current with new patterns

## License

This skill is part of the claude-skills repository and follows the same license terms.
```

### assets/database_schema_after.json

```json
{
  "schema_version": "2.0",
  "database": "user_management_v2",
  "tables": {
    "users": {
      "columns": {
        "id": {
          "type": "bigint",
          "nullable": false,
          "primary_key": true,
          "auto_increment": true
        },
        "username": {
          "type": "varchar",
          "length": 50,
          "nullable": false,
          "unique": true
        },
        "email": {
          "type": "varchar",
          "length": 320,
          "nullable": false,
          "unique": true
        },
        "password_hash": {
          "type": "varchar",
          "length": 255,
          "nullable": false
        },
        "first_name": {
          "type": "varchar",
          "length": 100,
          "nullable": true
        },
        "last_name": {
          "type": "varchar",
          "length": 100,
          "nullable": true
        },
        "created_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP"
        },
        "updated_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
        },
        "is_active": {
          "type": "boolean",
          "nullable": false,
          "default": true
        },
        "phone": {
          "type": "varchar",
          "length": 20,
          "nullable": true
        },
        "email_verified_at": {
          "type": "timestamp",
          "nullable": true,
          "comment": "When email was verified"
        },
        "phone_verified_at": {
          "type": "timestamp", 
          "nullable": true,
          "comment": "When phone was verified"
        },
        "two_factor_enabled": {
          "type": "boolean",
          "nullable": false,
          "default": false
        },
        "last_login_at": {
          "type": "timestamp",
          "nullable": true
        }
      },
      "constraints": {
        "primary_key": ["id"],
        "unique": [
          "username",
          "email"
        ],
        "foreign_key": [],
        "check": [
          "email LIKE '%@%'",
          "LENGTH(password_hash) >= 60",
          "phone IS NULL OR LENGTH(phone) >= 10"
        ]
      },
      "indexes": [
        {
          "name": "idx_users_email",
          "columns": ["email"],
          "unique": true
        },
        {
          "name": "idx_users_username",
          "columns": ["username"],
          "unique": true
        },
        {
          "name": "idx_users_created_at",
          "columns": ["created_at"]
        },
        {
          "name": "idx_users_email_verified",
          "columns": ["email_verified_at"]
        },
        {
          "name": "idx_users_last_login",
          "columns": ["last_login_at"]
        }
      ]
    },
    "user_profiles": {
      "columns": {
        "id": {
          "type": "bigint",
          "nullable": false,
          "primary_key": true,
          "auto_increment": true
        },
        "user_id": {
          "type": "bigint",
          "nullable": false
        },
        "bio": {
          "type": "text",
          "nullable": true
        },
        "avatar_url": {
          "type": "varchar",
          "length": 500,
          "nullable": true
        },
        "birth_date": {
          "type": "date",
          "nullable": true
        },
        "location": {
          "type": "varchar",
          "length": 100,
          "nullable": true
        },
        "website": {
          "type": "varchar",
          "length": 255,
          "nullable": true
        },
        "privacy_level": {
          "type": "varchar",
          "length": 20,
          "nullable": false,
          "default": "public"
        },
        "timezone": {
          "type": "varchar",
          "length": 50,
          "nullable": true,
          "default": "UTC"
        },
        "language": {
          "type": "varchar",
          "length": 10,
          "nullable": false,
          "default": "en"
        },
        "created_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP"
        },
        "updated_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
        }
      },
      "constraints": {
        "primary_key": ["id"],
        "unique": [],
        "foreign_key": [
          {
            "columns": ["user_id"],
            "references": "users(id)",
            "on_delete": "CASCADE"
          }
        ],
        "check": [
          "privacy_level IN ('public', 'private', 'friends_only')",
          "bio IS NULL OR LENGTH(bio) <= 2000",
          "language IN ('en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'ja', 'ko', 'zh')"
        ]
      },
      "indexes": [
        {
          "name": "idx_user_profiles_user_id",
          "columns": ["user_id"],
          "unique": true
        },
        {
          "name": "idx_user_profiles_privacy",
          "columns": ["privacy_level"]
        },
        {
          "name": "idx_user_profiles_language",
          "columns": ["language"]
        }
      ]
    },
    "user_sessions": {
      "columns": {
        "id": {
          "type": "varchar",
          "length": 128,
          "nullable": false,
          "primary_key": true
        },
        "user_id": {
          "type": "bigint",
          "nullable": false
        },
        "ip_address": {
          "type": "varchar",
          "length": 45,
          "nullable": true
        },
        "user_agent": {
          "type": "text",
          "nullable": true
        },
        "expires_at": {
          "type": "timestamp",
          "nullable": false
        },
        "created_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP"
        },
        "last_activity": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
        },
        "session_type": {
          "type": "varchar",
          "length": 20,
          "nullable": false,
          "default": "web"
        },
        "is_mobile": {
          "type": "boolean",
          "nullable": false,
          "default": false
        }
      },
      "constraints": {
        "primary_key": ["id"],
        "unique": [],
        "foreign_key": [
          {
            "columns": ["user_id"],
            "references": "users(id)",
            "on_delete": "CASCADE"
          }
        ],
        "check": [
          "session_type IN ('web', 'mobile', 'api', 'admin')"
        ]
      },
      "indexes": [
        {
          "name": "idx_user_sessions_user_id",
          "columns": ["user_id"]
        },
        {
          "name": "idx_user_sessions_expires",
          "columns": ["expires_at"]
        },
        {
          "name": "idx_user_sessions_type",
          "columns": ["session_type"]
        }
      ]
    },
    "user_preferences": {
      "columns": {
        "id": {
          "type": "bigint",
          "nullable": false,
          "primary_key": true,
          "auto_increment": true
        },
        "user_id": {
          "type": "bigint",
          "nullable": false
        },
        "preference_key": {
          "type": "varchar",
          "length": 100,
          "nullable": false
        },
        "preference_value": {
          "type": "json",
          "nullable": true
        },
        "created_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP"
        },
        "updated_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
        }
      },
      "constraints": {
        "primary_key": ["id"],
        "unique": [
          ["user_id", "preference_key"]
        ],
        "foreign_key": [
          {
            "columns": ["user_id"],
            "references": "users(id)",
            "on_delete": "CASCADE"
          }
        ],
        "check": []
      },
      "indexes": [
        {
          "name": "idx_user_preferences_user_key",
          "columns": ["user_id", "preference_key"],
          "unique": true
        }
      ]
    }
  },
  "views": {
    "active_users": {
      "definition": "SELECT u.id, u.username, u.email, u.first_name, u.last_name, u.email_verified_at, u.last_login_at FROM users u WHERE u.is_active = true",
      "columns": ["id", "username", "email", "first_name", "last_name", "email_verified_at", "last_login_at"]
    },
    "verified_users": {
      "definition": "SELECT u.id, u.username, u.email FROM users u WHERE u.is_active = true AND u.email_verified_at IS NOT NULL",
      "columns": ["id", "username", "email"]
    }
  },
  "procedures": [
    {
      "name": "cleanup_expired_sessions",
      "parameters": [],
      "definition": "DELETE FROM user_sessions WHERE expires_at < NOW()"
    },
    {
      "name": "get_user_with_profile",
      "parameters": ["user_id BIGINT"],
      "definition": "SELECT u.*, p.bio, p.avatar_url, p.privacy_level FROM users u LEFT JOIN user_profiles p ON u.id = p.user_id WHERE u.id = user_id"
    }
  ]
}
```

### assets/database_schema_before.json

```json
{
  "schema_version": "1.0",
  "database": "user_management",
  "tables": {
    "users": {
      "columns": {
        "id": {
          "type": "bigint",
          "nullable": false,
          "primary_key": true,
          "auto_increment": true
        },
        "username": {
          "type": "varchar",
          "length": 50,
          "nullable": false,
          "unique": true
        },
        "email": {
          "type": "varchar",
          "length": 255,
          "nullable": false,
          "unique": true
        },
        "password_hash": {
          "type": "varchar",
          "length": 255,
          "nullable": false
        },
        "first_name": {
          "type": "varchar",
          "length": 100,
          "nullable": true
        },
        "last_name": {
          "type": "varchar",
          "length": 100,
          "nullable": true
        },
        "created_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP"
        },
        "updated_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
        },
        "is_active": {
          "type": "boolean",
          "nullable": false,
          "default": true
        },
        "phone": {
          "type": "varchar",
          "length": 20,
          "nullable": true
        }
      },
      "constraints": {
        "primary_key": ["id"],
        "unique": [
          "username",
          "email"
        ],
        "foreign_key": [],
        "check": [
          "email LIKE '%@%'",
          "LENGTH(password_hash) >= 60"
        ]
      },
      "indexes": [
        {
          "name": "idx_users_email",
          "columns": ["email"],
          "unique": true
        },
        {
          "name": "idx_users_username", 
          "columns": ["username"],
          "unique": true
        },
        {
          "name": "idx_users_created_at",
          "columns": ["created_at"]
        }
      ]
    },
    "user_profiles": {
      "columns": {
        "id": {
          "type": "bigint",
          "nullable": false,
          "primary_key": true,
          "auto_increment": true
        },
        "user_id": {
          "type": "bigint",
          "nullable": false
        },
        "bio": {
          "type": "varchar",
          "length": 255,
          "nullable": true
        },
        "avatar_url": {
          "type": "varchar",
          "length": 500,
          "nullable": true
        },
        "birth_date": {
          "type": "date",
          "nullable": true
        },
        "location": {
          "type": "varchar",
          "length": 100,
          "nullable": true
        },
        "website": {
          "type": "varchar",
          "length": 255,
          "nullable": true
        },
        "privacy_level": {
          "type": "varchar",
          "length": 20,
          "nullable": false,
          "default": "public"
        },
        "created_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP"
        },
        "updated_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
        }
      },
      "constraints": {
        "primary_key": ["id"],
        "unique": [],
        "foreign_key": [
          {
            "columns": ["user_id"],
            "references": "users(id)",
            "on_delete": "CASCADE"
          }
        ],
        "check": [
          "privacy_level IN ('public', 'private', 'friends_only')"
        ]
      },
      "indexes": [
        {
          "name": "idx_user_profiles_user_id",
          "columns": ["user_id"],
          "unique": true
        },
        {
          "name": "idx_user_profiles_privacy",
          "columns": ["privacy_level"]
        }
      ]
    },
    "user_sessions": {
      "columns": {
        "id": {
          "type": "varchar",
          "length": 128,
          "nullable": false,
          "primary_key": true
        },
        "user_id": {
          "type": "bigint",
          "nullable": false
        },
        "ip_address": {
          "type": "varchar",
          "length": 45,
          "nullable": true
        },
        "user_agent": {
          "type": "varchar",
          "length": 500,
          "nullable": true
        },
        "expires_at": {
          "type": "timestamp",
          "nullable": false
        },
        "created_at": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP"
        },
        "last_activity": {
          "type": "timestamp",
          "nullable": false,
          "default": "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
        }
      },
      "constraints": {
        "primary_key": ["id"],
        "unique": [],
        "foreign_key": [
          {
            "columns": ["user_id"],
            "references": "users(id)",
            "on_delete": "CASCADE"
          }
        ],
        "check": []
      },
      "indexes": [
        {
          "name": "idx_user_sessions_user_id",
          "columns": ["user_id"]
        },
        {
          "name": "idx_user_sessions_expires",
          "columns": ["expires_at"]
        }
      ]
    }
  },
  "views": {
    "active_users": {
      "definition": "SELECT u.id, u.username, u.email, u.first_name, u.last_name FROM users u WHERE u.is_active = true",
      "columns": ["id", "username", "email", "first_name", "last_name"]
    }
  },
  "procedures": [
    {
      "name": "cleanup_expired_sessions",
      "parameters": [],
      "definition": "DELETE FROM user_sessions WHERE expires_at < NOW()"
    }
  ]
}
```

### assets/sample_database_migration.json

```json
{
  "type": "database",
  "pattern": "schema_change",
  "source": "PostgreSQL 13 Production Database",
  "target": "PostgreSQL 15 Cloud Database",
  "description": "Migrate user management system from on-premises PostgreSQL to cloud with schema updates",
  "constraints": {
    "max_downtime_minutes": 30,
    "data_volume_gb": 2500,
    "dependencies": [
      "user_service_api",
      "authentication_service",
      "notification_service",
      "analytics_pipeline",
      "backup_service"
    ],
    "compliance_requirements": [
      "GDPR",
      "SOX"
    ],
    "special_requirements": [
      "zero_data_loss",
      "referential_integrity",
      "performance_baseline_maintained"
    ]
  },
  "tables_to_migrate": [
    {
      "name": "users",
      "row_count": 1500000,
      "size_mb": 450,
      "critical": true
    },
    {
      "name": "user_profiles",
      "row_count": 1500000,
      "size_mb": 890,
      "critical": true
    },
    {
      "name": "user_sessions",
      "row_count": 25000000,
      "size_mb": 1200,
      "critical": false
    },
    {
      "name": "audit_logs",
      "row_count": 50000000,
      "size_mb": 2800,
      "critical": false
    }
  ],
  "schema_changes": [
    {
      "table": "users",
      "changes": [
        {
          "type": "add_column",
          "column": "email_verified_at",
          "data_type": "timestamp",
          "nullable": true
        },
        {
          "type": "add_column", 
          "column": "phone_verified_at",
          "data_type": "timestamp",
          "nullable": true
        }
      ]
    },
    {
      "table": "user_profiles",
      "changes": [
        {
          "type": "modify_column",
          "column": "bio",
          "old_type": "varchar(255)",
          "new_type": "text"
        },
        {
          "type": "add_constraint",
          "constraint_type": "check",
          "constraint_name": "bio_length_check",
          "definition": "LENGTH(bio) <= 2000"
        }
      ]
    }
  ],
  "performance_requirements": {
    "max_query_response_time_ms": 100,
    "concurrent_connections": 500,
    "transactions_per_second": 1000
  },
  "business_continuity": {
    "critical_business_hours": {
      "start": "08:00",
      "end": "18:00", 
      "timezone": "UTC"
    },
    "preferred_migration_window": {
      "start": "02:00",
      "end": "06:00",
      "timezone": "UTC"
    }
  }
}
```

### assets/sample_service_migration.json

```json
{
  "type": "service",
  "pattern": "strangler_fig",
  "source": "Legacy User Service (Java Spring Boot 2.x)",
  "target": "New User Service (Node.js + TypeScript)",
  "description": "Migrate legacy user management service to modern microservices architecture",
  "constraints": {
    "max_downtime_minutes": 0,
    "data_volume_gb": 50,
    "dependencies": [
      "payment_service",
      "order_service", 
      "notification_service",
      "analytics_service",
      "mobile_app_v1",
      "mobile_app_v2",
      "web_frontend",
      "admin_dashboard"
    ],
    "compliance_requirements": [
      "PCI_DSS",
      "GDPR"
    ],
    "special_requirements": [
      "api_backward_compatibility",
      "session_continuity",
      "rate_limit_preservation"
    ]
  },
  "service_details": {
    "legacy_service": {
      "endpoints": [
        "GET /api/v1/users/{id}",
        "POST /api/v1/users",
        "PUT /api/v1/users/{id}",
        "DELETE /api/v1/users/{id}",
        "GET /api/v1/users/{id}/profile",
        "PUT /api/v1/users/{id}/profile",
        "POST /api/v1/users/{id}/verify-email",
        "POST /api/v1/users/login",
        "POST /api/v1/users/logout"
      ],
      "current_load": {
        "requests_per_second": 850,
        "peak_requests_per_second": 2000,
        "average_response_time_ms": 120,
        "p95_response_time_ms": 300
      },
      "infrastructure": {
        "instances": 4,
        "cpu_cores_per_instance": 4,
        "memory_gb_per_instance": 8,
        "load_balancer": "AWS ELB Classic"
      }
    },
    "new_service": {
      "endpoints": [
        "GET /api/v2/users/{id}",
        "POST /api/v2/users",
        "PUT /api/v2/users/{id}",
        "DELETE /api/v2/users/{id}",
        "GET /api/v2/users/{id}/profile",
        "PUT /api/v2/users/{id}/profile", 
        "POST /api/v2/users/{id}/verify-email",
        "POST /api/v2/users/{id}/verify-phone",
        "POST /api/v2/auth/login",
        "POST /api/v2/auth/logout",
        "POST /api/v2/auth/refresh"
      ],
      "target_performance": {
        "requests_per_second": 1500,
        "peak_requests_per_second": 3000,
        "average_response_time_ms": 80,
        "p95_response_time_ms": 200
      },
      "infrastructure": {
        "container_platform": "Kubernetes",
        "initial_replicas": 3,
        "max_replicas": 10,
        "cpu_request_millicores": 500,
        "cpu_limit_millicores": 1000,
        "memory_request_mb": 512,
        "memory_limit_mb": 1024,
        "load_balancer": "AWS ALB"
      }
    }
  },
  "migration_phases": [
    {
      "phase": "preparation",
      "description": "Deploy new service and configure routing",
      "estimated_duration_hours": 8
    },
    {
      "phase": "intercept",
      "description": "Configure API gateway to route to new service",
      "estimated_duration_hours": 2
    },
    {
      "phase": "gradual_migration",
      "description": "Gradually increase traffic to new service",
      "estimated_duration_hours": 48
    },
    {
      "phase": "validation",
      "description": "Validate new service performance and functionality",
      "estimated_duration_hours": 24
    },
    {
      "phase": "decommission",
      "description": "Remove legacy service after validation",
      "estimated_duration_hours": 4
    }
  ],
  "feature_flags": [
    {
      "name": "enable_new_user_service",
      "description": "Route user service requests to new implementation",
      "initial_percentage": 5,
      "rollout_schedule": [
        {"percentage": 5, "duration_hours": 24},
        {"percentage": 25, "duration_hours": 24},
        {"percentage": 50, "duration_hours": 24},
        {"percentage": 100, "duration_hours": 0}
      ]
    },
    {
      "name": "enable_new_auth_endpoints",
      "description": "Enable new authentication endpoints",
      "initial_percentage": 0,
      "rollout_schedule": [
        {"percentage": 10, "duration_hours": 12},
        {"percentage": 50, "duration_hours": 12},
        {"percentage": 100, "duration_hours": 0}
      ]
    }
  ],
  "monitoring": {
    "critical_metrics": [
      "request_rate",
      "error_rate", 
      "response_time_p95",
      "response_time_p99",
      "cpu_utilization",
      "memory_utilization",
      "database_connection_pool"
    ],
    "alert_thresholds": {
      "error_rate": 0.05,
      "response_time_p95": 250,
      "cpu_utilization": 0.80,
      "memory_utilization": 0.85
    }
  },
  "rollback_triggers": [
    {
      "metric": "error_rate",
      "threshold": 0.10,
      "duration_minutes": 5,
      "action": "automatic_rollback"
    },
    {
      "metric": "response_time_p95",
      "threshold": 500,
      "duration_minutes": 10,
      "action": "alert_team"
    },
    {
      "metric": "cpu_utilization",
      "threshold": 0.95,
      "duration_minutes": 5,
      "action": "scale_up"
    }
  ]
}
```

### references/data_reconciliation_strategies.md

```markdown
# Data Reconciliation Strategies

## Overview

Data reconciliation is the process of ensuring data consistency and integrity across systems during and after migrations. This document provides comprehensive strategies, tools, and implementation patterns for detecting, measuring, and correcting data discrepancies in migration scenarios.

## Core Principles

### 1. Eventually Consistent
Accept that perfect real-time consistency may not be achievable during migrations, but ensure eventual consistency through reconciliation processes.

### 2. Idempotent Operations
All reconciliation operations must be safe to run multiple times without causing additional issues.

### 3. Audit Trail
Maintain detailed logs of all reconciliation actions for compliance and debugging.

### 4. Non-Destructive
Reconciliation should prefer addition over deletion, and always maintain backups before corrections.

## Types of Data Inconsistencies

### 1. Missing Records
Records that exist in source but not in target system.

### 2. Extra Records
Records that exist in target but not in source system.

### 3. Field Mismatches
Records exist in both systems but with different field values.

### 4. Referential Integrity Violations
Foreign key relationships that are broken during migration.

### 5. Temporal Inconsistencies
Data with incorrect timestamps or ordering.

### 6. Schema Drift
Structural differences between source and target schemas.

## Detection Strategies

### 1. Row Count Validation

#### Simple Count Comparison
```sql
-- Compare total row counts
SELECT 
    'source' as system, 
    COUNT(*) as row_count 
FROM source_table
UNION ALL
SELECT 
    'target' as system, 
    COUNT(*) as row_count 
FROM target_table;
```

#### Filtered Count Comparison
```sql
-- Compare counts with business logic filters
WITH source_counts AS (
    SELECT 
        status,
        created_date::date as date,
        COUNT(*) as count
    FROM source_orders
    WHERE created_date >= '2024-01-01'
    GROUP BY status, created_date::date
),
target_counts AS (
    SELECT 
        status,
        created_date::date as date,
        COUNT(*) as count
    FROM target_orders
    WHERE created_date >= '2024-01-01'
    GROUP BY status, created_date::date
)
SELECT 
    COALESCE(s.status, t.status) as status,
    COALESCE(s.date, t.date) as date,
    COALESCE(s.count, 0) as source_count,
    COALESCE(t.count, 0) as target_count,
    COALESCE(s.count, 0) - COALESCE(t.count, 0) as difference
FROM source_counts s
FULL OUTER JOIN target_counts t 
    ON s.status = t.status AND s.date = t.date
WHERE COALESCE(s.count, 0) != COALESCE(t.count, 0);
```

### 2. Checksum-Based Validation

#### Record-Level Checksums
```python
import hashlib
import json

class RecordChecksum:
    def __init__(self, exclude_fields=None):
        self.exclude_fields = exclude_fields or ['updated_at', 'version']
    
    def calculate_checksum(self, record):
        """Calculate MD5 checksum for a database record"""
        # Remove excluded fields and sort for consistency
        filtered_record = {
            k: v for k, v in record.items() 
            if k not in self.exclude_fields
        }
        
        # Convert to sorted JSON string for consistent hashing
        normalized = json.dumps(filtered_record, sort_keys=True, default=str)
        
        return hashlib.md5(normalized.encode('utf-8')).hexdigest()
    
    def compare_records(self, source_record, target_record):
        """Compare two records using checksums"""
        source_checksum = self.calculate_checksum(source_record)
        target_checksum = self.calculate_checksum(target_record)
        
        return {
            'match': source_checksum == target_checksum,
            'source_checksum': source_checksum,
            'target_checksum': target_checksum
        }

# Usage example
checksum_calculator = RecordChecksum(exclude_fields=['updated_at', 'migration_flag'])

source_records = fetch_records_from_source()
target_records = fetch_records_from_target()

mismatches = []
for source_id, source_record in source_records.items():
    if source_id in target_records:
        comparison = checksum_calculator.compare_records(
            source_record, target_records[source_id]
        )
        if not comparison['match']:
            mismatches.append({
                'record_id': source_id,
                'source_checksum': comparison['source_checksum'],
                'target_checksum': comparison['target_checksum']
            })
```

#### Aggregate Checksums
```sql
-- Calculate aggregate checksums for data validation
WITH source_aggregates AS (
    SELECT 
        DATE_TRUNC('day', created_at) as day,
        status,
        COUNT(*) as record_count,
        SUM(amount) as total_amount,
        MD5(STRING_AGG(CAST(id AS VARCHAR) || ':' || CAST(amount AS VARCHAR), '|' ORDER BY id)) as checksum
    FROM source_transactions
    GROUP BY DATE_TRUNC('day', created_at), status
),
target_aggregates AS (
    SELECT 
        DATE_TRUNC('day', created_at) as day,
        status,
        COUNT(*) as record_count,
        SUM(amount) as total_amount,
        MD5(STRING_AGG(CAST(id AS VARCHAR) || ':' || CAST(amount AS VARCHAR), '|' ORDER BY id)) as checksum
    FROM target_transactions
    GROUP BY DATE_TRUNC('day', created_at), status
)
SELECT 
    COALESCE(s.day, t.day) as day,
    COALESCE(s.status, t.status) as status,
    COALESCE(s.record_count, 0) as source_count,
    COALESCE(t.record_count, 0) as target_count,
    COALESCE(s.total_amount, 0) as source_amount,
    COALESCE(t.total_amount, 0) as target_amount,
    s.checksum as source_checksum,
    t.checksum as target_checksum,
    CASE WHEN s.checksum = t.checksum THEN 'MATCH' ELSE 'MISMATCH' END as status
FROM source_aggregates s
FULL OUTER JOIN target_aggregates t 
    ON s.day = t.day AND s.status = t.status
WHERE s.checksum != t.checksum OR s.checksum IS NULL OR t.checksum IS NULL;
```

### 3. Delta Detection

#### Change Data Capture (CDC) Based
```python
class CDCReconciler:
    def __init__(self, kafka_client, database_client):
        self.kafka = kafka_client
        self.db = database_client
        self.processed_changes = set()
    
    def process_cdc_stream(self, topic_name):
        """Process CDC events and track changes for reconciliation"""
        
        consumer = self.kafka.consumer(topic_name)
        
        for message in consumer:
            change_event = json.loads(message.value)
            
            change_id = f"{change_event['table']}:{change_event['key']}:{change_event['timestamp']}"
            
            if change_id in self.processed_changes:
                continue  # Skip duplicate events
            
            try:
                self.apply_change(change_event)
                self.processed_changes.add(change_id)
                
                # Commit offset only after successful processing
                consumer.commit()
                
            except Exception as e:
                # Log failure and continue - will be caught by reconciliation
                self.log_processing_failure(change_id, str(e))
    
    def apply_change(self, change_event):
        """Apply CDC change to target system"""
        
        table = change_event['table']
        operation = change_event['operation']
        key = change_event['key']
        data = change_event.get('data', {})
        
        if operation == 'INSERT':
            self.db.insert(table, data)
        elif operation == 'UPDATE':
            self.db.update(table, key, data)
        elif operation == 'DELETE':
            self.db.delete(table, key)
    
    def reconcile_missed_changes(self, start_timestamp, end_timestamp):
        """Find and apply changes that may have been missed"""
        
        # Query source database for changes in time window
        source_changes = self.db.get_changes_in_window(
            start_timestamp, end_timestamp
        )
        
        missed_changes = []
        
        for change in source_changes:
            change_id = f"{change['table']}:{change['key']}:{change['timestamp']}"
            
            if change_id not in self.processed_changes:
                missed_changes.append(change)
        
        # Apply missed changes
        for change in missed_changes:
            try:
                self.apply_change(change)
                print(f"Applied missed change: {change['table']}:{change['key']}")
            except Exception as e:
                print(f"Failed to apply missed change: {e}")
```

### 4. Business Logic Validation

#### Critical Business Rules Validation
```python
class BusinessLogicValidator:
    def __init__(self, source_db, target_db):
        self.source_db = source_db
        self.target_db = target_db
    
    def validate_financial_consistency(self):
        """Validate critical financial calculations"""
        
        validation_rules = [
            {
                'name': 'daily_transaction_totals',
                'source_query': """
                    SELECT DATE(created_at) as date, SUM(amount) as total
                    FROM source_transactions
                    WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
                    GROUP BY DATE(created_at)
                """,
                'target_query': """
                    SELECT DATE(created_at) as date, SUM(amount) as total
                    FROM target_transactions
                    WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
                    GROUP BY DATE(created_at)
                """,
                'tolerance': 0.01  # Allow $0.01 difference for rounding
            },
            {
                'name': 'customer_balance_totals',
                'source_query': """
                    SELECT customer_id, SUM(balance) as total_balance
                    FROM source_accounts
                    GROUP BY customer_id
                    HAVING SUM(balance) > 0
                """,
                'target_query': """
                    SELECT customer_id, SUM(balance) as total_balance
                    FROM target_accounts
                    GROUP BY customer_id
                    HAVING SUM(balance) > 0
                """,
                'tolerance': 0.01
            }
        ]
        
        validation_results = []
        
        for rule in validation_rules:
            source_data = self.source_db.execute_query(rule['source_query'])
            target_data = self.target_db.execute_query(rule['target_query'])
            
            differences = self.compare_financial_data(
                source_data, target_data, rule['tolerance']
            )
            
            validation_results.append({
                'rule_name': rule['name'],
                'differences_found': len(differences),
                'differences': differences[:10],  # First 10 differences
                'status': 'PASS' if len(differences) == 0 else 'FAIL'
            })
        
        return validation_results
    
    def compare_financial_data(self, source_data, target_data, tolerance):
        """Compare financial data with tolerance for rounding differences"""
        
        source_dict = {
            tuple(row[:-1]): row[-1] for row in source_data
        }  # Last column is the amount
        
        target_dict = {
            tuple(row[:-1]): row[-1] for row in target_data
        }
        
        differences = []
        
        # Check for missing records and value differences
        for key, source_value in source_dict.items():
            if key not in target_dict:
                differences.append({
                    'key': key,
                    'source_value': source_value,
                    'target_value': None,
                    'difference_type': 'MISSING_IN_TARGET'
                })
            else:
                target_value = target_dict[key]
                if abs(float(source_value) - float(target_value)) > tolerance:
                    differences.append({
                        'key': key,
                        'source_value': source_value,
                        'target_value': target_value,
                        'difference': float(source_value) - float(target_value),
                        'difference_type': 'VALUE_MISMATCH'
                    })
        
        # Check for extra records in target
        for key, target_value in target_dict.items():
            if key not in source_dict:
                differences.append({
                    'key': key,
                    'source_value': None,
                    'target_value': target_value,
                    'difference_type': 'EXTRA_IN_TARGET'
                })
        
        return differences
```

## Correction Strategies

### 1. Automated Correction

#### Missing Record Insertion
```python
class AutoCorrector:
    def __init__(self, source_db, target_db, dry_run=True):
        self.source_db = source_db
        self.target_db = target_db
        self.dry_run = dry_run
        self.correction_log = []
    
    def correct_missing_records(self, table_name, key_field):
        """Add missing records from source to target"""
        
        # Find records in source but not in target
        missing_query = f"""
            SELECT s.* 
            FROM source_{table_name} s
            LEFT JOIN target_{table_name} t ON s.{key_field} = t.{key_field}
            WHERE t.{key_field} IS NULL
        """
        
        missing_records = self.source_db.execute_query(missing_query)
        
        for record in missing_records:
            correction = {
                'table': table_name,
                'operation': 'INSERT',
                'key': record[key_field],
                'data': record,
                'timestamp': datetime.utcnow()
            }
            
            if not self.dry_run:
                try:
                    self.target_db.insert(table_name, record)
                    correction['status'] = 'SUCCESS'
                except Exception as e:
                    correction['status'] = 'FAILED'
                    correction['error'] = str(e)
            else:
                correction['status'] = 'DRY_RUN'
            
            self.correction_log.append(correction)
        
        return len(missing_records)
    
    def correct_field_mismatches(self, table_name, key_field, fields_to_correct):
        """Correct field value mismatches"""
        
        mismatch_query = f"""
            SELECT s.{key_field}, {', '.join([f's.{f} as source_{f}, t.{f} as target_{f}' for f in fields_to_correct])}
            FROM source_{table_name} s
            JOIN target_{table_name} t ON s.{key_field} = t.{key_field}
            WHERE {' OR '.join([f's.{f} != t.{f}' for f in fields_to_correct])}
        """
        
        mismatched_records = self.source_db.execute_query(mismatch_query)
        
        for record in mismatched_records:
            key_value = record[key_field]
            updates = {}
            
            for field in fields_to_correct:
                source_value = record[f'source_{field}']
                target_value = record[f'target_{field}']
                
                if source_value != target_value:
                    updates[field] = source_value
            
            if updates:
                correction = {
                    'table': table_name,
                    'operation': 'UPDATE',
                    'key': key_value,
                    'updates': updates,
                    'timestamp': datetime.utcnow()
                }
                
                if not self.dry_run:
                    try:
                        self.target_db.update(table_name, {key_field: key_value}, updates)
                        correction['status'] = 'SUCCESS'
                    except Exception as e:
                        correction['status'] = 'FAILED'
                        correction['error'] = str(e)
                else:
                    correction['status'] = 'DRY_RUN'
                
                self.correction_log.append(correction)
        
        return len(mismatched_records)
```

### 2. Manual Review Process

#### Correction Workflow
```python
class ManualReviewSystem:
    def __init__(self, database_client):
        self.db = database_client
        self.review_queue = []
    
    def queue_for_review(self, discrepancy):
        """Add discrepancy to manual review queue"""
        
        review_item = {
            'id': str(uuid.uuid4()),
            'discrepancy_type': discrepancy['type'],
            'table': discrepancy['table'],
            'record_key': discrepancy['key'],
            'source_data': discrepancy.get('source_data'),
            'target_data': discrepancy.get('target_data'),
            'description': discrepancy['description'],
            'severity': discrepancy.get('severity', 'medium'),
            'status': 'PENDING',
            'created_at': datetime.utcnow(),
            'reviewed_by': None,
            'reviewed_at': None,
            'resolution': None
        }
        
        self.review_queue.append(review_item)
        
        # Persist to review database
        self.db.insert('manual_review_queue', review_item)
        
        return review_item['id']
    
    def process_review(self, review_id, reviewer, action, notes=None):
        """Process manual review decision"""
        
        review_item = self.get_review_item(review_id)
        
        if not review_item:
            raise ValueError(f"Review item {review_id} not found")
        
        review_item.update({
            'status': 'REVIEWED',
            'reviewed_by': reviewer,
            'reviewed_at': datetime.utcnow(),
            'resolution': {
                'action': action,  # 'APPLY_SOURCE', 'KEEP_TARGET', 'CUSTOM_FIX'
                'notes': notes
            }
        })
        
        # Apply the resolution
        if action == 'APPLY_SOURCE':
            self.apply_source_data(review_item)
        elif action == 'KEEP_TARGET':
            pass  # No action needed
        elif action == 'CUSTOM_FIX':
            # Custom fix would be applied separately
            pass
        
        # Update review record
        self.db.update('manual_review_queue', 
                      {'id': review_id}, 
                      review_item)
        
        return review_item
    
    def generate_review_report(self):
        """Generate summary report of manual reviews"""
        
        reviews = self.db.query("""
            SELECT 
                discrepancy_type,
                severity,
                status,
                COUNT(*) as count,
                MIN(created_at) as oldest_review,
                MAX(created_at) as newest_review
            FROM manual_review_queue
            GROUP BY discrepancy_type, severity, status
            ORDER BY severity DESC, discrepancy_type
        """)
        
        return reviews
```

### 3. Reconciliation Scheduling

#### Automated Reconciliation Jobs
```python
import schedule
import time
from datetime import datetime, timedelta

class ReconciliationScheduler:
    def __init__(self, reconciler):
        self.reconciler = reconciler
        self.job_history = []
    
    def setup_schedules(self):
        """Set up automated reconciliation schedules"""
        
        # Quick reconciliation every 15 minutes during migration
        schedule.every(15).minutes.do(self.quick_reconciliation)
        
        # Comprehensive reconciliation every 4 hours
        schedule.every(4).hours.do(self.comprehensive_reconciliation)
        
        # Deep validation daily
        schedule.every().day.at("02:00").do(self.deep_validation)
        
        # Weekly business logic validation
        schedule.every().sunday.at("03:00").do(self.business_logic_validation)
    
    def quick_reconciliation(self):
        """Quick count-based reconciliation"""
        
        job_start = datetime.utcnow()
        
        try:
            # Check critical tables only
            critical_tables = [
                'transactions', 'orders', 'customers', 'accounts'
            ]
            
            results = []
            for table in critical_tables:
                count_diff = self.reconciler.check_row_counts(table)
                if abs(count_diff) > 0:
                    results.append({
                        'table': table,
                        'count_difference': count_diff,
                        'severity': 'high' if abs(count_diff) > 100 else 'medium'
                    })
            
            job_result = {
                'job_type': 'quick_reconciliation',
                'start_time': job_start,
                'end_time': datetime.utcnow(),
                'status': 'completed',
                'issues_found': len(results),
                'details': results
            }
            
            # Alert if significant issues found
            if any(r['severity'] == 'high' for r in results):
                self.send_alert(job_result)
        
        except Exception as e:
            job_result = {
                'job_type': 'quick_reconciliation',
                'start_time': job_start,
                'end_time': datetime.utcnow(),
                'status': 'failed',
                'error': str(e)
            }
        
        self.job_history.append(job_result)
    
    def comprehensive_reconciliation(self):
        """Comprehensive checksum-based reconciliation"""
        
        job_start = datetime.utcnow()
        
        try:
            tables_to_check = self.get_migration_tables()
            issues = []
            
            for table in tables_to_check:
                # Sample-based checksum validation
                sample_issues = self.reconciler.validate_sample_checksums(
                    table, sample_size=1000
                )
                issues.extend(sample_issues)
            
            # Auto-correct simple issues
            auto_corrections = 0
            for issue in issues:
                if issue['auto_correctable']:
                    self.reconciler.auto_correct_issue(issue)
                    auto_corrections += 1
                else:
                    # Queue for manual review
                    self.reconciler.queue_for_manual_review(issue)
            
            job_result = {
                'job_type': 'comprehensive_reconciliation',
                'start_time': job_start,
                'end_time': datetime.utcnow(),
                'status': 'completed',
                'total_issues': len(issues),
                'auto_corrections': auto_corrections,
                'manual_reviews_queued': len(issues) - auto_corrections
            }
        
        except Exception as e:
            job_result = {
                'job_type': 'comprehensive_reconciliation',
                'start_time': job_start,
                'end_time': datetime.utcnow(),
                'status': 'failed',
                'error': str(e)
            }
        
        self.job_history.append(job_result)
    
    def run_scheduler(self):
        """Run the reconciliation scheduler"""
        
        print("Starting reconciliation scheduler...")
        
        while True:
            schedule.run_pending()
            time.sleep(60)  # Check every minute
```

## Monitoring and Reporting

### 1. Reconciliation Metrics

```python
class ReconciliationMetrics:
    def __init__(self, prometheus_client):
        self.prometheus = prometheus_client
        
        # Define metrics
        self.inconsistencies_found = Counter(
            'reconciliation_inconsistencies_total',
            'Number of inconsistencies found',
            ['table', 'type', 'severity']
        )
        
        self.reconciliation_duration = Histogram(
            'reconciliation_duration_seconds',
            'Time spent on reconciliation jobs',
            ['job_type']
        )
        
        self.auto_corrections = Counter(
            'reconciliation_auto_corrections_total',
            'Number of automatically corrected inconsistencies',
            ['table', 'correction_type']
        )
        
        self.data_drift_gauge = Gauge(
            'data_drift_percentage',
            'Percentage of records with inconsistencies',
            ['table']
        )
    
    def record_inconsistency(self, table, inconsistency_type, severity):
        """Record a found inconsistency"""
        self.inconsistencies_found.labels(
            table=table,
            type=inconsistency_type,
            severity=severity
        ).inc()
    
    def record_auto_correction(self, table, correction_type):
        """Record an automatic correction"""
        self.auto_corrections.labels(
            table=table,
            correction_type=correction_type
        ).inc()
    
    def update_data_drift(self, table, drift_percentage):
        """Update data drift gauge"""
        self.data_drift_gauge.labels(table=table).set(drift_percentage)
    
    def record_job_duration(self, job_type, duration_seconds):
        """Record reconciliation job duration"""
        self.reconciliation_duration.labels(job_type=job_type).observe(duration_seconds)
```

### 2. Alerting Rules

```yaml
# Prometheus alerting rules for data reconciliation
groups:
  - name: data_reconciliation
    rules:
    - alert: HighDataInconsistency
      expr: reconciliation_inconsistencies_total > 100
      for: 5m
      labels:
        severity: critical
      annotations:
        summary: "High number of data inconsistencies detected"
        description: "{{ $value }} inconsistencies found in the last 5 minutes"
    
    - alert: DataDriftHigh
      expr: data_drift_percentage > 5
      for: 10m
      labels:
        severity: warning
      annotations:
        summary: "Data drift percentage is high"
        description: "{{ $labels.table }} has {{ $value }}% data drift"
    
    - alert: ReconciliationJobFailed
      expr: up{job="reconciliation"} == 0
      for: 2m
      labels:
        severity: critical
      annotations:
        summary: "Reconciliation job is down"
        description: "The data reconciliation service is not responding"
    
    - alert: AutoCorrectionRateHigh
      expr: rate(reconciliation_auto_corrections_total[10m]) > 10
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "High rate of automatic corrections"
        description: "Auto-correction rate is {{ $value }} per second"
```

### 3. Dashboard and Reporting

```python
class ReconciliationDashboard:
    def __init__(self, database_client, metrics_client):
        self.db = database_client
        self.metrics = metrics_client
    
    def generate_daily_report(self, date=None):
        """Generate daily reconciliation report"""
        
        if not date:
            date = datetime.utcnow().date()
        
        # Query reconciliation results for the day
        daily_stats = self.db.query("""
            SELECT 
                table_name,
                inconsistency_type,
                COUNT(*) as count,
                AVG(CASE WHEN resolution = 'AUTO_CORRECTED' THEN 1 ELSE 0 END) as auto_correction_rate
            FROM reconciliation_log
            WHERE DATE(created_at) = %s
            GROUP BY table_name, inconsistency_type
        """, (date,))
        
        # Generate summary
        summary = {
            'date': date.isoformat(),
            'total_inconsistencies': sum(row['count'] for row in daily_stats),
            'auto_correction_rate': sum(row['auto_correction_rate'] * row['count'] for row in daily_stats) / max(sum(row['count'] for row in daily_stats), 1),
            'tables_affected': len(set(row['table_name'] for row in daily_stats)),
            'details_by_table': {}
        }
        
        # Group by table
        for row in daily_stats:
            table = row['table_name']
            if table not in summary['details_by_table']:
                summary['details_by_table'][table] = []
            
            summary['details_by_table'][table].append({
                'inconsistency_type': row['inconsistency_type'],
                'count': row['count'],
                'auto_correction_rate': row['auto_correction_rate']
            })
        
        return summary
    
    def generate_trend_analysis(self, days=7):
        """Generate trend analysis for reconciliation metrics"""
        
        end_date = datetime.utcnow().date()
        start_date = end_date - timedelta(days=days)
        
        trends = self.db.query("""
            SELECT 
                DATE(created_at) as date,
                table_name,
                COUNT(*) as inconsistencies,
                AVG(CASE WHEN resolution = 'AUTO_CORRECTED' THEN 1 ELSE 0 END) as auto_correction_rate
            FROM reconciliation_log
            WHERE DATE(created_at) BETWEEN %s AND %s
            GROUP BY DATE(created_at), table_name
            ORDER BY date, table_name
        """, (start_date, end_date))
        
        # Calculate trends
        trend_analysis = {
            'period': f"{start_date} to {end_date}",
            'trends': {},
            'overall_trend': 'stable'
        }
        
        for table in set(row['table_name'] for row in trends):
            table_data = [row for row in trends if row['table_name'] == table]
            
            if len(table_data) >= 2:
                first_count = table_data[0]['inconsistencies']
                last_count = table_data[-1]['inconsistencies']
                
                if last_count > first_count * 1.2:
                    trend = 'increasing'
                elif last_count < first_count * 0.8:
                    trend = 'decreasing'
                else:
                    trend = 'stable'
                
                trend_analysis['trends'][table] = {
                    'direction': trend,
                    'first_day_count': first_count,
                    'last_day_count': last_count,
                    'change_percentage': ((last_count - first_count) / max(first_count, 1)) * 100
                }
        
        return trend_analysis
```

## Advanced Reconciliation Techniques

### 1. Machine Learning-Based Anomaly Detection

```python
from sklearn.isolation import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np

class MLAnomalyDetector:
    def __init__(self):
        self.models = {}
        self.scalers = {}
    
    def train_anomaly_detector(self, table_name, training_data):
        """Train anomaly detection model for a specific table"""
        
        # Prepare features (convert records to numerical features)
        features = self.extract_features(training_data)
        
        # Scale features
        scaler = StandardScaler()
        scaled_features = scaler.fit_transform(features)
        
        # Train isolation forest
        model = IsolationForest(contamination=0.05, random_state=42)
        model.fit(scaled_features)
        
        # Store model and scaler
        self.models[table_name] = model
        self.scalers[table_name] = scaler
    
    def detect_anomalies(self, table_name, data):
        """Detect anomalous records that may indicate reconciliation issues"""
        
        if table_name not in self.models:
            raise ValueError(f"No trained model for table {table_name}")
        
        # Extract features
        features = self.extract_features(data)
        
        # Scale features
        scaled_features = self.scalers[table_name].transform(features)
        
        # Predict anomalies
        anomaly_scores = self.models[table_name].decision_function(scaled_features)
        anomaly_predictions = self.models[table_name].predict(scaled_features)
        
        # Return anomalous records with scores
        anomalies = []
        for i, (record, score, is_anomaly) in enumerate(zip(data, anomaly_scores, anomaly_predictions)):
            if is_anomaly == -1:  # Isolation forest returns -1 for anomalies
                anomalies.append({
                    'record_index': i,
                    'record': record,
                    'anomaly_score': score,
                    'severity': 'high' if score < -0.5 else 'medium'
                })
        
        return anomalies
    
    def extract_features(self, data):
        """Extract numerical features from database records"""
        
        features = []
        
        for record in data:
            record_features = []
            
            for key, value in record.items():
                if isinstance(value, (int, float)):
                    record_features.append(value)
                elif isinstance(value, str):
                    # Convert string to hash-based feature
                    record_features.append(hash(value) % 10000)
                elif isinstance(value, datetime):
                    # Convert datetime to timestamp
                    record_features.append(value.timestamp())
                else:
                    # Default value for other types
                    record_features.append(0)
            
            features.append(record_features)
        
        return np.array(features)
```

### 2. Probabilistic Reconciliation

```python
import random
from typing import List, Dict, Tuple

class ProbabilisticReconciler:
    def __init__(self, confidence_threshold=0.95):
        self.confidence_threshold = confidence_threshold
    
    def statistical_sampling_validation(self, table_name: str, population_size: int) -> Dict:
        """Use statistical sampling to validate large datasets"""
        
        # Calculate sample size for 95% confidence, 5% margin of error
        confidence_level = 0.95
        margin_of_error = 0.05
        
        z_score = 1.96  # for 95% confidence
        p = 0.5  # assume 50% error rate for maximum sample size
        
        sample_size = (z_score ** 2 * p * (1 - p)) / (margin_of_error ** 2)
        
        if population_size < 10000:
            # Finite population correction
            sample_size = sample_size / (1 + (sample_size - 1) / population_size)
        
        sample_size = min(int(sample_size), population_size)
        
        # Generate random sample
        sample_ids = self.generate_random_sample(table_name, sample_size)
        
        # Validate sample
        sample_results = self.validate_sample_records(table_name, sample_ids)
        
        # Calculate population estimates
        error_rate = sample_results['errors'] / sample_size
        estimated_errors = int(population_size * error_rate)
        
        # Calculate confidence interval
        standard_error = (error_rate * (1 - error_rate) / sample_size) ** 0.5
        margin_of_error_actual = z_score * standard_error
        
        confidence_interval = (
            max(0, error_rate - margin_of_error_actual),
            min(1, error_rate + margin_of_error_actual)
        )
        
        return {
            'table_name': table_name,
            'population_size': population_size,
            'sample_size': sample_size,
            'sample_error_rate': error_rate,
            'estimated_total_errors': estimated_errors,
            'confidence_interval': confidence_interval,
            'confidence_level': confidence_level,
            'recommendation': self.generate_recommendation(error_rate, confidence_interval)
        }
    
    def generate_random_sample(self, table_name: str, sample_size: int) -> List[int]:
        """Generate random sample of record IDs"""
        
        # Get total record count and ID range
        id_range = self.db.query(f"SELECT MIN(id), MAX(id) FROM {table_name}")[0]
        min_id, max_id = id_range
        
        # Generate random IDs
        sample_ids = []
        attempts = 0
        max_attempts = sample_size * 10  # Avoid infinite loop
        
        while len(sample_ids) < sample_size and attempts < max_attempts:
            candidate_id = random.randint(min_id, max_id)
            
            # Check if ID exists
            exists = self.db.query(f"SELECT 1 FROM {table_name} WHERE id = %s", (candidate_id,))
            
            if exists and candidate_id not in sample_ids:
                sample_ids.append(candidate_id)
            
            attempts += 1
        
        return sample_ids
    
    def validate_sample_records(self, table_name: str, sample_ids: List[int]) -> Dict:
        """Validate a sample of records"""
        
        validation_results = {
            'total_checked': len(sample_ids),
            'errors': 0,
            'error_details': []
        }
        
        for record_id in sample_ids:
            # Get record from both source and target
            source_record = self.source_db.get_record(table_name, record_id)
            target_record = self.target_db.get_record(table_name, record_id)
            
            if not target_record:
                validation_results['errors'] += 1
                validation_results['error_details'].append({
                    'id': record_id,
                    'error_type': 'MISSING_IN_TARGET'
                })
            elif not self.records_match(source_record, target_record):
                validation_results['errors'] += 1
                validation_results['error_details'].append({
                    'id': record_id,
                    'error_type': 'DATA_MISMATCH',
                    'differences': self.find_differences(source_record, target_record)
                })
        
        return validation_results
    
    def generate_recommendation(self, error_rate: float, confidence_interval: Tuple[float, float]) -> str:
        """Generate recommendation based on error rate and confidence"""
        
        if confidence_interval[1] < 0.01:  # Less than 1% error rate with confidence
            return "Data quality is excellent. Continue with normal reconciliation schedule."
        elif confidence_interval[1] < 0.05:  # Less than 5% error rate with confidence
            return "Data quality is acceptable. Monitor closely and investigate sample errors."
        elif confidence_interval[0] > 0.1:  # More than 10% error rate with confidence
            return "Data quality is poor. Immediate comprehensive reconciliation required."
        else:
            return "Data quality is uncertain. Increase sample size for better estimates."
```

## Performance Optimization

### 1. Parallel Processing

```python
import asyncio
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

class ParallelReconciler:
    def __init__(self, max_workers=None):
        self.max_workers = max_workers or mp.cpu_count()
    
    async def parallel_table_reconciliation(self, tables: List[str]):
        """Reconcile multiple tables in parallel"""
        
        async with asyncio.Semaphore(self.max_workers):
            tasks = [
                self.reconcile_table_async(table) 
                for table in tables
            ]
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Process results
            summary = {
                'total_tables': len(tables),
                'successful': 0,
                'failed': 0,
                'results': {}
            }
            
            for table, result in zip(tables, results):
                if isinstance(result, Exception):
                    summary['failed'] += 1
                    summary['results'][table] = {
                        'status': 'failed',
                        'error': str(result)
                    }
                else:
                    summary['successful'] += 1
                    summary['results'][table] = result
            
            return summary
    
    def parallel_chunk_processing(self, table_name: str, chunk_size: int = 10000):
        """Process table reconciliation in parallel chunks"""
        
        # Get total record count
        total_records = self.db.get_record_count(table_name)
        num_chunks = (total_records + chunk_size - 1) // chunk_size
        
        # Create chunk specifications
        chunks = []
        for i in range(num_chunks):
            start_id = i * chunk_size
            end_id = min((i + 1) * chunk_size - 1, total_records - 1)
            chunks.append({
                'table': table_name,
                'start_id': start_id,
                'end_id': end_id,
                'chunk_number': i + 1
            })
        
        # Process chunks in parallel
        with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
            chunk_results = list(executor.map(self.process_chunk, chunks))
        
        # Aggregate results
        total_inconsistencies = sum(r['inconsistencies'] for r in chunk_results)
        total_corrections = sum(r['corrections'] for r in chunk_results)
        
        return {
            'table': table_name,
            'total_records': total_records,
            'chunks_processed': len(chunks),
            'total_inconsistencies': total_inconsistencies,
            'total_corrections': total_corrections,
            'chunk_details': chunk_results
        }
    
    def process_chunk(self, chunk_spec: Dict) -> Dict:
        """Process a single chunk of records"""
        
        # This runs in a separate process
        table = chunk_spec['table']
        start_id = chunk_spec['start_id']
        end_id = chunk_spec['end_id']
        
        # Initialize database connections for this process
        local_source_db = SourceDatabase()
        local_target_db = TargetDatabase()
        
        # Get records in chunk
        source_records = local_source_db.get_records_range(table, start_id, end_id)
        target_records = local_target_db.get_records_range(table, start_id, end_id)
        
        # Reconcile chunk
        inconsistencies = 0
        corrections = 0
        
        for source_record in source_records:
            target_record = target_records.get(source_record['id'])
            
            if not target_record:
                inconsistencies += 1
                # Auto-correct if possible
                try:
                    local_target_db.insert(table, source_record)
                    corrections += 1
                except Exception:
                    pass  # Log error in production
            elif not self.records_match(source_record, target_record):
                inconsistencies += 1
                # Auto-correct field mismatches
                try:
                    updates = self.calculate_updates(source_record, target_record)
                    local_target_db.update(table, source_record['id'], updates)
                    corrections += 1
                except Exception:
                    pass  # Log error in production
        
        return {
            'chunk_number': chunk_spec['chunk_number'],
            'start_id': start_id,
            'end_id': end_id,
            'records_processed': len(source_records),
            'inconsistencies': inconsistencies,
            'corrections': corrections
        }
```

### 2. Incremental Reconciliation

```python
class IncrementalReconciler:
    def __init__(self, source_db, target_db):
        self.source_db = source_db
        self.target_db = target_db
        self.last_reconciliation_times = {}
    
    def incremental_reconciliation(self, table_name: str):
        """Reconcile only records changed since last reconciliation"""
        
        last_reconciled = self.get_last_reconciliation_time(table_name)
        
        # Get records modified since last reconciliation
        modified_source = self.source_db.get_records_modified_since(
            table_name, last_reconciled
        )
        
        modified_target = self.target_db.get_records_modified_since(
            table_name, last_reconciled
        )
        
        # Create lookup dictionaries
        source_dict = {r['id']: r for r in modified_source}
        target_dict = {r['id']: r for r in modified_target}
        
        # Find all record IDs to check
        all_ids = set(source_dict.keys()) | set(target_dict.keys())
        
        inconsistencies = []
        
        for record_id in all_ids:
            source_record = source_dict.get(record_id)
            target_record = target_dict.get(record_id)
            
            if source_record and not target_record:
                inconsistencies.append({
                    'type': 'missing_in_target',
                    'table': table_name,
                    'id': record_id,
                    'source_record': source_record
                })
            elif not source_record and target_record:
                inconsistencies.append({
                    'type': 'extra_in_target',
                    'table': table_name,
                    'id': record_id,
                    'target_record': target_record
                })
            elif source_record and target_record:
                if not self.records_match(source_record, target_record):
                    inconsistencies.append({
                        'type': 'data_mismatch',
                        'table': table_name,
                        'id': record_id,
                        'source_record': source_record,
                        'target_record': target_record,
                        'differences': self.find_differences(source_record, target_record)
                    })
        
        # Update last reconciliation time
        self.update_last_reconciliation_time(table_name, datetime.utcnow())
        
        return {
            'table': table_name,
            'reconciliation_time': datetime.utcnow(),
            'records_checked': len(all_ids),
            'inconsistencies_found': len(inconsistencies),
            'inconsistencies': inconsistencies
        }
    
    def get_last_reconciliation_time(self, table_name: str) -> datetime:
        """Get the last reconciliation timestamp for a table"""
        
        result = self.source_db.query("""
            SELECT last_reconciled_at
            FROM reconciliation_metadata
            WHERE table_name = %s
        """, (table_name,))
        
        if result:
            return result[0]['last_reconciled_at']
        else:
            # First time reconciliation - start from beginning of migration
            return self.get_migration_start_time()
    
    def update_last_reconciliation_time(self, table_name: str, timestamp: datetime):
        """Update the last reconciliation timestamp"""
        
        self.source_db.execute("""
            INSERT INTO reconciliation_metadata (table_name, last_reconciled_at)
            VALUES (%s, %s)
            ON CONFLICT (table_name)
            DO UPDATE SET last_reconciled_at = %s
        """, (table_name, timestamp, timestamp))
```

This comprehensive guide provides the framework and tools necessary for implementing robust data reconciliation strategies during migrations, ensuring data integrity and consistency while minimizing business disruption.
```

### references/migration_patterns_catalog.md

```markdown
# Migration Patterns Catalog

## Overview

This catalog provides detailed descriptions of proven migration patterns, their use cases, implementation guidelines, and best practices. Each pattern includes code examples, diagrams, and lessons learned from real-world implementations.

## Database Migration Patterns

### 1. Expand-Contract Pattern

**Use Case:** Schema evolution with zero downtime
**Complexity:** Medium
**Risk Level:** Low-Medium

#### Description
The Expand-Contract pattern allows for schema changes without downtime by following a three-phase approach:

1. **Expand:** Add new schema elements alongside existing ones
2. **Migrate:** Dual-write to both old and new schema during transition
3. **Contract:** Remove old schema elements after validation

#### Implementation Steps

```sql
-- Phase 1: Expand
ALTER TABLE users ADD COLUMN email_new VARCHAR(255);
CREATE INDEX CONCURRENTLY idx_users_email_new ON users(email_new);

-- Phase 2: Migrate (Application Code)
-- Write to both columns during transition period
INSERT INTO users (name, email, email_new) VALUES (?, ?, ?);

-- Backfill existing data
UPDATE users SET email_new = email WHERE email_new IS NULL;

-- Phase 3: Contract (after validation)
ALTER TABLE users DROP COLUMN email;
ALTER TABLE users RENAME COLUMN email_new TO email;
```

#### Pros and Cons
**Pros:**
- Zero downtime deployments
- Safe rollback at any point
- Gradual transition with validation

**Cons:**
- Increased storage during transition
- More complex application logic
- Extended migration timeline

### 2. Parallel Schema Pattern

**Use Case:** Major database restructuring
**Complexity:** High
**Risk Level:** Medium

#### Description
Run new and old schemas in parallel, using feature flags to gradually route traffic to the new schema while maintaining the ability to rollback quickly.

#### Implementation Example

```python
class DatabaseRouter:
    def __init__(self, feature_flag_service):
        self.feature_flags = feature_flag_service
        self.old_db = OldDatabaseConnection()
        self.new_db = NewDatabaseConnection()
    
    def route_query(self, user_id, query_type):
        if self.feature_flags.is_enabled("new_schema", user_id):
            return self.new_db.execute(query_type)
        else:
            return self.old_db.execute(query_type)
    
    def dual_write(self, data):
        # Write to both databases for consistency
        success_old = self.old_db.write(data)
        success_new = self.new_db.write(transform_data(data))
        
        if not (success_old and success_new):
            # Handle partial failures
            self.handle_dual_write_failure(data, success_old, success_new)
```

#### Best Practices
- Implement data consistency checks between schemas
- Use circuit breakers for automatic failover
- Monitor performance impact of dual writes
- Plan for data reconciliation processes

### 3. Event Sourcing Migration

**Use Case:** Migrating systems with complex business logic
**Complexity:** High
**Risk Level:** Medium-High

#### Description
Capture all changes as events during migration, enabling replay and reconciliation capabilities.

#### Event Store Schema
```sql
CREATE TABLE migration_events (
    event_id UUID PRIMARY KEY,
    aggregate_id UUID NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    event_data JSONB NOT NULL,
    event_version INTEGER NOT NULL,
    occurred_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    processed_at TIMESTAMP WITH TIME ZONE
);
```

#### Migration Event Handler
```python
class MigrationEventHandler:
    def __init__(self, old_store, new_store):
        self.old_store = old_store
        self.new_store = new_store
        self.event_log = []
    
    def handle_update(self, entity_id, old_data, new_data):
        # Log the change as an event
        event = MigrationEvent(
            entity_id=entity_id,
            event_type="entity_migrated",
            old_data=old_data,
            new_data=new_data,
            timestamp=datetime.now()
        )
        
        self.event_log.append(event)
        
        # Apply to new store
        success = self.new_store.update(entity_id, new_data)
        
        if not success:
            # Mark for retry
            event.status = "failed"
            self.schedule_retry(event)
        
        return success
    
    def replay_events(self, from_timestamp=None):
        """Replay events for reconciliation"""
        events = self.get_events_since(from_timestamp)
        for event in events:
            self.apply_event(event)
```

## Service Migration Patterns

### 1. Strangler Fig Pattern

**Use Case:** Legacy system replacement
**Complexity:** Medium-High
**Risk Level:** Medium

#### Description
Gradually replace legacy functionality by intercepting calls and routing them to new services, eventually "strangling" the legacy system.

#### Implementation Architecture

```yaml
# API Gateway Configuration
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: user-service-migration
spec:
  http:
  - match:
    - headers:
        migration-flag:
          exact: "new"
    route:
    - destination:
        host: user-service-v2
  - route:
    - destination:
        host: user-service-v1
```

#### Strangler Proxy Implementation

```python
class StranglerProxy:
    def __init__(self):
        self.legacy_service = LegacyUserService()
        self.new_service = NewUserService()
        self.feature_flags = FeatureFlagService()
    
    def handle_request(self, request):
        route = self.determine_route(request)
        
        if route == "new":
            return self.handle_with_new_service(request)
        elif route == "both":
            return self.handle_with_both_services(request)
        else:
            return self.handle_with_legacy_service(request)
    
    def determine_route(self, request):
        user_id = request.get('user_id')
        
        if self.feature_flags.is_enabled("new_user_service", user_id):
            if self.feature_flags.is_enabled("dual_write", user_id):
                return "both"
            else:
                return "new"
        else:
            return "legacy"
```

### 2. Parallel Run Pattern

**Use Case:** Risk mitigation for critical services
**Complexity:** Medium
**Risk Level:** Low-Medium

#### Description
Run both old and new services simultaneously, comparing outputs to validate correctness before switching traffic.

#### Implementation

```python
class ParallelRunManager:
    def __init__(self):
        self.primary_service = PrimaryService()
        self.candidate_service = CandidateService()
        self.comparator = ResponseComparator()
        self.metrics = MetricsCollector()
    
    async def parallel_execute(self, request):
        # Execute both services concurrently
        primary_task = asyncio.create_task(
            self.primary_service.process(request)
        )
        candidate_task = asyncio.create_task(
            self.candidate_service.process(request)
        )
        
        # Always wait for primary
        primary_result = await primary_task
        
        try:
            # Wait for candidate with timeout
            candidate_result = await asyncio.wait_for(
                candidate_task, timeout=5.0
            )
            
            # Compare results
            comparison = self.comparator.compare(
                primary_result, candidate_result
            )
            
            # Record metrics
            self.metrics.record_comparison(comparison)
            
        except asyncio.TimeoutError:
            self.metrics.record_timeout("candidate")
        except Exception as e:
            self.metrics.record_error("candidate", str(e))
        
        # Always return primary result
        return primary_result
```

### 3. Blue-Green Deployment Pattern

**Use Case:** Zero-downtime service updates
**Complexity:** Low-Medium
**Risk Level:** Low

#### Description
Maintain two identical production environments (blue and green), switching traffic between them for deployments.

#### Kubernetes Implementation

```yaml
# Blue Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: app-blue
  labels:
    version: blue
spec:
  replicas: 3
  selector:
    matchLabels:
      app: myapp
      version: blue
  template:
    metadata:
      labels:
        app: myapp
        version: blue
    spec:
      containers:
      - name: app
        image: myapp:v1.0.0

---
# Green Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: app-green
  labels:
    version: green
spec:
  replicas: 3
  selector:
    matchLabels:
      app: myapp
      version: green
  template:
    metadata:
      labels:
        app: myapp
        version: green
    spec:
      containers:
      - name: app
        image: myapp:v2.0.0

---
# Service (switches between blue and green)
apiVersion: v1
kind: Service
metadata:
  name: app-service
spec:
  selector:
    app: myapp
    version: blue  # Change to green for deployment
  ports:
  - port: 80
    targetPort: 8080
```

## Infrastructure Migration Patterns

### 1. Lift and Shift Pattern

**Use Case:** Quick cloud migration with minimal changes
**Complexity:** Low-Medium
**Risk Level:** Low

#### Description
Migrate applications to cloud infrastructure with minimal or no code changes, focusing on infrastructure compatibility.

#### Migration Checklist

```yaml
Pre-Migration Assessment:
  - inventory_current_infrastructure:
      - servers_and_specifications
      - network_configuration
      - storage_requirements
      - security_configurations
  - identify_dependencies:
      - database_connections
      - external_service_integrations
      - file_system_dependencies
  - assess_compatibility:
      - operating_system_versions
      - runtime_dependencies
      - license_requirements

Migration Execution:
  - provision_target_infrastructure:
      - compute_instances
      - storage_volumes
      - network_configuration
      - security_groups
  - migrate_data:
      - database_backup_restore
      - file_system_replication
      - configuration_files
  - update_configurations:
      - connection_strings
      - environment_variables
      - dns_records
  - validate_functionality:
      - application_health_checks
      - end_to_end_testing
      - performance_validation
```

### 2. Hybrid Cloud Migration

**Use Case:** Gradual cloud adoption with on-premises integration
**Complexity:** High
**Risk Level:** Medium-High

#### Description
Maintain some components on-premises while migrating others to cloud, requiring secure connectivity and data synchronization.

#### Network Architecture

```hcl
# Terraform configuration for hybrid connectivity
resource "aws_vpc" "main" {
  cidr_block           = "10.0.0.0/16"
  enable_dns_hostnames = true
  enable_dns_support   = true
}

resource "aws_vpn_gateway" "main" {
  vpc_id = aws_vpc.main.id
  
  tags = {
    Name = "hybrid-vpn-gateway"
  }
}

resource "aws_customer_gateway" "main" {
  bgp_asn    = 65000
  ip_address = var.on_premises_public_ip
  type       = "ipsec.1"
  
  tags = {
    Name = "on-premises-gateway"
  }
}

resource "aws_vpn_connection" "main" {
  vpn_gateway_id      = aws_vpn_gateway.main.id
  customer_gateway_id = aws_customer_gateway.main.id
  type                = "ipsec.1"
  static_routes_only  = true
}
```

#### Data Synchronization Pattern

```python
class HybridDataSync:
    def __init__(self):
        self.on_prem_db = OnPremiseDatabase()
        self.cloud_db = CloudDatabase()
        self.sync_log = SyncLogManager()
    
    async def bidirectional_sync(self):
        """Synchronize data between on-premises and cloud"""
        
        # Get last sync timestamp
        last_sync = self.sync_log.get_last_sync_time()
        
        # Sync on-prem changes to cloud
        on_prem_changes = self.on_prem_db.get_changes_since(last_sync)
        for change in on_prem_changes:
            await self.apply_change_to_cloud(change)
        
        # Sync cloud changes to on-prem
        cloud_changes = self.cloud_db.get_changes_since(last_sync)
        for change in cloud_changes:
            await self.apply_change_to_on_prem(change)
        
        # Handle conflicts
        conflicts = self.detect_conflicts(on_prem_changes, cloud_changes)
        for conflict in conflicts:
            await self.resolve_conflict(conflict)
        
        # Update sync timestamp
        self.sync_log.record_sync_completion()
    
    async def apply_change_to_cloud(self, change):
        """Apply on-premises change to cloud database"""
        try:
            if change.operation == "INSERT":
                await self.cloud_db.insert(change.table, change.data)
            elif change.operation == "UPDATE":
                await self.cloud_db.update(change.table, change.key, change.data)
            elif change.operation == "DELETE":
                await self.cloud_db.delete(change.table, change.key)
                
            self.sync_log.record_success(change.id, "cloud")
            
        except Exception as e:
            self.sync_log.record_failure(change.id, "cloud", str(e))
            raise
```

### 3. Multi-Cloud Migration

**Use Case:** Avoiding vendor lock-in or regulatory requirements
**Complexity:** Very High
**Risk Level:** High

#### Description
Distribute workloads across multiple cloud providers for resilience, compliance, or cost optimization.

#### Service Mesh Configuration

```yaml
# Istio configuration for multi-cloud service mesh
apiVersion: networking.istio.io/v1beta1
kind: ServiceEntry
metadata:
  name: aws-service
spec:
  hosts:
  - aws-service.company.com
  ports:
  - number: 443
    name: https
    protocol: HTTPS
  location: MESH_EXTERNAL
  resolution: DNS

---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: multi-cloud-routing
spec:
  hosts:
  - user-service
  http:
  - match:
    - headers:
        region:
          exact: "us-east"
    route:
    - destination:
        host: aws-service.company.com
      weight: 100
  - match:
    - headers:
        region:
          exact: "eu-west"
    route:
    - destination:
        host: gcp-service.company.com
      weight: 100
  - route:  # Default routing
    - destination:
        host: user-service
        subset: local
      weight: 80
    - destination:
        host: aws-service.company.com
      weight: 20
```

## Feature Flag Patterns

### 1. Progressive Rollout Pattern

**Use Case:** Gradual feature deployment with risk mitigation
**Implementation:**

```python
class ProgressiveRollout:
    def __init__(self, feature_name):
        self.feature_name = feature_name
        self.rollout_percentage = 0
        self.user_buckets = {}
        
    def is_enabled_for_user(self, user_id):
        # Consistent user bucketing
        user_hash = hashlib.md5(f"{self.feature_name}:{user_id}".encode()).hexdigest()
        bucket = int(user_hash, 16) % 100
        
        return bucket < self.rollout_percentage
    
    def increase_rollout(self, target_percentage, step_size=10):
        """Gradually increase rollout percentage"""
        while self.rollout_percentage < target_percentage:
            self.rollout_percentage = min(
                self.rollout_percentage + step_size,
                target_percentage
            )
            
            # Monitor metrics before next increase
            yield self.rollout_percentage
            time.sleep(300)  # Wait 5 minutes between increases
```

### 2. Circuit Breaker Pattern

**Use Case:** Automatic fallback during migration issues

```python
class MigrationCircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    def call_new_service(self, request):
        if self.state == 'OPEN':
            if self.should_attempt_reset():
                self.state = 'HALF_OPEN'
            else:
                return self.fallback_to_legacy(request)
        
        try:
            response = self.new_service.process(request)
            self.on_success()
            return response
        except Exception as e:
            self.on_failure()
            return self.fallback_to_legacy(request)
    
    def on_success(self):
        self.failure_count = 0
        self.state = 'CLOSED'
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'
    
    def should_attempt_reset(self):
        return (time.time() - self.last_failure_time) >= self.timeout
```

## Migration Anti-Patterns

### 1. Big Bang Migration (Anti-Pattern)

**Why to Avoid:**
- High risk of complete system failure
- Difficult to rollback
- Extended downtime
- All-or-nothing deployment

**Better Alternative:** Use incremental migration patterns like Strangler Fig or Parallel Run.

### 2. No Rollback Plan (Anti-Pattern)

**Why to Avoid:**
- Cannot recover from failures
- Increases business risk
- Panic-driven decisions during issues

**Better Alternative:** Always implement comprehensive rollback procedures before migration.

### 3. Insufficient Testing (Anti-Pattern)

**Why to Avoid:**
- Unknown compatibility issues
- Performance degradation
- Data corruption risks

**Better Alternative:** Implement comprehensive testing at each migration phase.

## Pattern Selection Matrix

| Migration Type | Complexity | Downtime Tolerance | Recommended Pattern |
|---------------|------------|-------------------|-------------------|
| Schema Change | Low | Zero | Expand-Contract |
| Schema Change | High | Zero | Parallel Schema |
| Service Replace | Medium | Zero | Strangler Fig |
| Service Update | Low | Zero | Blue-Green |
| Data Migration | High | Some | Event Sourcing |
| Infrastructure | Low | Some | Lift and Shift |
| Infrastructure | High | Zero | Hybrid Cloud |

## Success Metrics

### Technical Metrics
- Migration completion rate
- System availability during migration
- Performance impact (response time, throughput)
- Error rate changes
- Rollback execution time

### Business Metrics
- Customer impact score
- Revenue protection
- Time to value realization
- Stakeholder satisfaction

### Operational Metrics
- Team efficiency
- Knowledge transfer effectiveness
- Post-migration support requirements
- Documentation completeness

## Lessons Learned

### Common Pitfalls
1. **Underestimating data dependencies** - Always map all data relationships
2. **Insufficient monitoring** - Implement comprehensive observability before migration
3. **Poor communication** - Keep all stakeholders informed throughout the process
4. **Rushed timelines** - Allow adequate time for testing and validation
5. **Ignoring performance impact** - Benchmark before and after migration

### Best Practices
1. **Start with low-risk migrations** - Build confidence and experience
2. **Automate everything possible** - Reduce human error and increase repeatability
3. **Test rollback procedures** - Ensure you can recover from any failure
4. **Monitor continuously** - Use real-time dashboards and alerting
5. **Document everything** - Create comprehensive runbooks and documentation

This catalog serves as a reference for selecting appropriate migration patterns based on specific requirements, risk tolerance, and technical constraints.
```

### references/zero_downtime_techniques.md

```markdown
# Zero-Downtime Migration Techniques

## Overview

Zero-downtime migrations are critical for maintaining business continuity and user experience during system changes. This guide provides comprehensive techniques, patterns, and implementation strategies for achieving true zero-downtime migrations across different system components.

## Core Principles

### 1. Backward Compatibility
Every change must be backward compatible until all clients have migrated to the new version.

### 2. Incremental Changes
Break large changes into smaller, independent increments that can be deployed and validated separately.

### 3. Feature Flags
Use feature toggles to control the rollout of new functionality without code deployments.

### 4. Graceful Degradation
Ensure systems continue to function even when some components are unavailable or degraded.

## Database Zero-Downtime Techniques

### Schema Evolution Without Downtime

#### 1. Additive Changes Only
**Principle:** Only add new elements; never remove or modify existing ones directly.

```sql
-- ✅ Good: Additive change
ALTER TABLE users ADD COLUMN middle_name VARCHAR(50);

-- ❌ Bad: Breaking change
ALTER TABLE users DROP COLUMN email;
```

#### 2. Multi-Phase Schema Evolution

**Phase 1: Expand**
```sql
-- Add new column alongside existing one
ALTER TABLE users ADD COLUMN email_address VARCHAR(255);

-- Add index concurrently (PostgreSQL)
CREATE INDEX CONCURRENTLY idx_users_email_address ON users(email_address);
```

**Phase 2: Dual Write (Application Code)**
```python
class UserService:
    def create_user(self, name, email):
        # Write to both old and new columns
        user = User(
            name=name,
            email=email,           # Old column
            email_address=email    # New column
        )
        return user.save()
    
    def update_email(self, user_id, new_email):
        # Update both columns
        user = User.objects.get(id=user_id)
        user.email = new_email
        user.email_address = new_email
        user.save()
        return user
```

**Phase 3: Backfill Data**
```sql
-- Backfill existing data (in batches)
UPDATE users 
SET email_address = email 
WHERE email_address IS NULL 
  AND id BETWEEN ? AND ?;
```

**Phase 4: Switch Reads**
```python
class UserService:
    def get_user_email(self, user_id):
        user = User.objects.get(id=user_id)
        # Switch to reading from new column
        return user.email_address or user.email
```

**Phase 5: Contract**
```sql
-- After validation, remove old column
ALTER TABLE users DROP COLUMN email;
-- Rename new column if needed
ALTER TABLE users RENAME COLUMN email_address TO email;
```

### 3. Online Schema Changes

#### PostgreSQL Techniques

```sql
-- Safe column addition
ALTER TABLE orders ADD COLUMN status_new VARCHAR(20) DEFAULT 'pending';

-- Safe index creation
CREATE INDEX CONCURRENTLY idx_orders_status_new ON orders(status_new);

-- Safe constraint addition (after data validation)
ALTER TABLE orders ADD CONSTRAINT check_status_new 
CHECK (status_new IN ('pending', 'processing', 'completed', 'cancelled'));
```

#### MySQL Techniques

```sql
-- Use pt-online-schema-change for large tables
pt-online-schema-change \
  --alter "ADD COLUMN status VARCHAR(20) DEFAULT 'pending'" \
  --execute \
  D=mydb,t=orders

-- Online DDL (MySQL 5.6+)
ALTER TABLE orders 
ADD COLUMN priority INT DEFAULT 1,
ALGORITHM=INPLACE, 
LOCK=NONE;
```

### 4. Data Migration Strategies

#### Chunked Data Migration

```python
class DataMigrator:
    def __init__(self, source_table, target_table, chunk_size=1000):
        self.source_table = source_table
        self.target_table = target_table
        self.chunk_size = chunk_size
    
    def migrate_data(self):
        last_id = 0
        total_migrated = 0
        
        while True:
            # Get next chunk
            chunk = self.get_chunk(last_id, self.chunk_size)
            
            if not chunk:
                break
            
            # Transform and migrate chunk
            for record in chunk:
                transformed = self.transform_record(record)
                self.insert_or_update(transformed)
            
            last_id = chunk[-1]['id']
            total_migrated += len(chunk)
            
            # Brief pause to avoid overwhelming the database
            time.sleep(0.1)
            
            self.log_progress(total_migrated)
        
        return total_migrated
    
    def get_chunk(self, last_id, limit):
        return db.execute(f"""
            SELECT * FROM {self.source_table}
            WHERE id > %s
            ORDER BY id
            LIMIT %s
        """, (last_id, limit))
```

#### Change Data Capture (CDC)

```python
class CDCProcessor:
    def __init__(self):
        self.kafka_consumer = KafkaConsumer('db_changes')
        self.target_db = TargetDatabase()
    
    def process_changes(self):
        for message in self.kafka_consumer:
            change = json.loads(message.value)
            
            if change['operation'] == 'INSERT':
                self.handle_insert(change)
            elif change['operation'] == 'UPDATE':
                self.handle_update(change)
            elif change['operation'] == 'DELETE':
                self.handle_delete(change)
    
    def handle_insert(self, change):
        transformed_data = self.transform_data(change['after'])
        self.target_db.insert(change['table'], transformed_data)
    
    def handle_update(self, change):
        key = change['key']
        transformed_data = self.transform_data(change['after'])
        self.target_db.update(change['table'], key, transformed_data)
```

## Application Zero-Downtime Techniques

### 1. Blue-Green Deployments

#### Infrastructure Setup

```yaml
# Blue Environment (Current Production)
apiVersion: apps/v1
kind: Deployment
metadata:
  name: app-blue
  labels:
    version: blue
    app: myapp
spec:
  replicas: 3
  selector:
    matchLabels:
      app: myapp
      version: blue
  template:
    metadata:
      labels:
        app: myapp
        version: blue
    spec:
      containers:
      - name: app
        image: myapp:1.0.0
        ports:
        - containerPort: 8080
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 5
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 15
          periodSeconds: 10

---
# Green Environment (New Version)
apiVersion: apps/v1
kind: Deployment
metadata:
  name: app-green
  labels:
    version: green
    app: myapp
spec:
  replicas: 3
  selector:
    matchLabels:
      app: myapp
      version: green
  template:
    metadata:
      labels:
        app: myapp
        version: green
    spec:
      containers:
      - name: app
        image: myapp:2.0.0
        ports:
        - containerPort: 8080
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 5
```

#### Service Switching

```yaml
# Service (switches between blue and green)
apiVersion: v1
kind: Service
metadata:
  name: app-service
spec:
  selector:
    app: myapp
    version: blue  # Switch to 'green' for deployment
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer
```

#### Automated Deployment Script

```bash
#!/bin/bash

# Blue-Green Deployment Script
NAMESPACE="production"
APP_NAME="myapp"
NEW_IMAGE="myapp:2.0.0"

# Determine current and target environments
CURRENT_VERSION=$(kubectl get service $APP_NAME-service -o jsonpath='{.spec.selector.version}')

if [ "$CURRENT_VERSION" = "blue" ]; then
    TARGET_VERSION="green"
else
    TARGET_VERSION="blue"
fi

echo "Current version: $CURRENT_VERSION"
echo "Target version: $TARGET_VERSION"

# Update target environment with new image
kubectl set image deployment/$APP_NAME-$TARGET_VERSION app=$NEW_IMAGE

# Wait for rollout to complete
kubectl rollout status deployment/$APP_NAME-$TARGET_VERSION --timeout=300s

# Run health checks
echo "Running health checks..."
TARGET_IP=$(kubectl get service $APP_NAME-$TARGET_VERSION -o jsonpath='{.status.loadBalancer.ingress[0].ip}')

for i in {1..30}; do
    if curl -f http://$TARGET_IP/health; then
        echo "Health check passed"
        break
    fi
    
    if [ $i -eq 30 ]; then
        echo "Health check failed after 30 attempts"
        exit 1
    fi
    
    sleep 2
done

# Switch traffic to new version
kubectl patch service $APP_NAME-service -p '{"spec":{"selector":{"version":"'$TARGET_VERSION'"}}}'

echo "Traffic switched to $TARGET_VERSION"

# Monitor for 5 minutes
echo "Monitoring new version..."
sleep 300

# Check if rollback is needed
ERROR_RATE=$(curl -s "http://monitoring.company.com/api/error_rate?service=$APP_NAME" | jq '.error_rate')

if (( $(echo "$ERROR_RATE > 0.05" | bc -l) )); then
    echo "Error rate too high ($ERROR_RATE), rolling back..."
    kubectl patch service $APP_NAME-service -p '{"spec":{"selector":{"version":"'$CURRENT_VERSION'"}}}'
    exit 1
fi

echo "Deployment successful!"
```

### 2. Canary Deployments

#### Progressive Canary with Istio

```yaml
# Destination Rule
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: myapp-destination
spec:
  host: myapp
  subsets:
  - name: v1
    labels:
      version: v1
  - name: v2
    labels:
      version: v2

---
# Virtual Service for Canary
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: myapp-canary
spec:
  hosts:
  - myapp
  http:
  - match:
    - headers:
        canary:
          exact: "true"
    route:
    - destination:
        host: myapp
        subset: v2
  - route:
    - destination:
        host: myapp
        subset: v1
      weight: 95
    - destination:
        host: myapp
        subset: v2
      weight: 5
```

#### Automated Canary Controller

```python
class CanaryController:
    def __init__(self, istio_client, prometheus_client):
        self.istio = istio_client
        self.prometheus = prometheus_client
        self.canary_weight = 5
        self.max_weight = 100
        self.weight_increment = 5
        self.validation_window = 300  # 5 minutes
    
    async def deploy_canary(self, app_name, new_version):
        """Deploy new version using canary strategy"""
        
        # Start with small percentage
        await self.update_traffic_split(app_name, self.canary_weight)
        
        while self.canary_weight < self.max_weight:
            # Monitor metrics for validation window
            await asyncio.sleep(self.validation_window)
            
            # Check canary health
            if not await self.is_canary_healthy(app_name, new_version):
                await self.rollback_canary(app_name)
                raise Exception("Canary deployment failed health checks")
            
            # Increase traffic to canary
            self.canary_weight = min(
                self.canary_weight + self.weight_increment,
                self.max_weight
            )
            
            await self.update_traffic_split(app_name, self.canary_weight)
            
            print(f"Canary traffic increased to {self.canary_weight}%")
        
        print("Canary deployment completed successfully")
    
    async def is_canary_healthy(self, app_name, version):
        """Check if canary version is healthy"""
        
        # Check error rate
        error_rate = await self.prometheus.query(
            f'rate(http_requests_total{{app="{app_name}", version="{version}", status=~"5.."}}'
            f'[5m]) / rate(http_requests_total{{app="{app_name}", version="{version}"}}[5m])'
        )
        
        if error_rate > 0.05:  # 5% error rate threshold
            return False
        
        # Check response time
        p95_latency = await self.prometheus.query(
            f'histogram_quantile(0.95, rate(http_request_duration_seconds_bucket'
            f'{{app="{app_name}", version="{version}"}}[5m]))'
        )
        
        if p95_latency > 2.0:  # 2 second p95 threshold
            return False
        
        return True
    
    async def update_traffic_split(self, app_name, canary_weight):
        """Update Istio virtual service with new traffic split"""
        
        stable_weight = 100 - canary_weight
        
        virtual_service = {
            "apiVersion": "networking.istio.io/v1beta1",
            "kind": "VirtualService",
            "metadata": {"name": f"{app_name}-canary"},
            "spec": {
                "hosts": [app_name],
                "http": [{
                    "route": [
                        {
                            "destination": {"host": app_name, "subset": "stable"},
                            "weight": stable_weight
                        },
                        {
                            "destination": {"host": app_name, "subset": "canary"},
                            "weight": canary_weight
                        }
                    ]
                }]
            }
        }
        
        await self.istio.apply_virtual_service(virtual_service)
```

### 3. Rolling Updates

#### Kubernetes Rolling Update Strategy

```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rolling-update-app
spec:
  replicas: 10
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 2         # Can have 2 extra pods during update
      maxUnavailable: 1   # At most 1 pod can be unavailable
  selector:
    matchLabels:
      app: rolling-update-app
  template:
    metadata:
      labels:
        app: rolling-update-app
    spec:
      containers:
      - name: app
        image: myapp:2.0.0
        ports:
        - containerPort: 8080
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 2
          timeoutSeconds: 1
          successThreshold: 1
          failureThreshold: 3
        livenessProbe:
          httpGet:
            path: /live
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 10
```

#### Custom Rolling Update Controller

```python
class RollingUpdateController:
    def __init__(self, k8s_client):
        self.k8s = k8s_client
        self.max_surge = 2
        self.max_unavailable = 1
        
    async def rolling_update(self, deployment_name, new_image):
        """Perform rolling update with custom logic"""
        
        deployment = await self.k8s.get_deployment(deployment_name)
        total_replicas = deployment.spec.replicas
        
        # Calculate batch size
        batch_size = min(self.max_surge, total_replicas // 5)  # Update 20% at a time
        
        updated_pods = []
        
        for i in range(0, total_replicas, batch_size):
            batch_end = min(i + batch_size, total_replicas)
            
            # Update batch of pods
            for pod_index in range(i, batch_end):
                old_pod = await self.get_pod_by_index(deployment_name, pod_index)
                
                # Create new pod with new image
                new_pod = await self.create_updated_pod(old_pod, new_image)
                
                # Wait for new pod to be ready
                await self.wait_for_pod_ready(new_pod.metadata.name)
                
                # Remove old pod
                await self.k8s.delete_pod(old_pod.metadata.name)
                
                updated_pods.append(new_pod)
                
                # Brief pause between pod updates
                await asyncio.sleep(2)
            
            # Validate batch health before continuing
            if not await self.validate_batch_health(updated_pods[-batch_size:]):
                # Rollback batch
                await self.rollback_batch(updated_pods[-batch_size:])
                raise Exception("Rolling update failed validation")
            
            print(f"Updated {batch_end}/{total_replicas} pods")
        
        print("Rolling update completed successfully")
```

## Load Balancer and Traffic Management

### 1. Weighted Routing

#### NGINX Configuration

```nginx
upstream backend {
    # Old version - 80% traffic
    server old-app-1:8080 weight=4;
    server old-app-2:8080 weight=4;
    
    # New version - 20% traffic
    server new-app-1:8080 weight=1;
    server new-app-2:8080 weight=1;
}

server {
    listen 80;
    location / {
        proxy_pass http://backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        
        # Health check headers
        proxy_set_header X-Health-Check-Timeout 5s;
    }
}
```

#### HAProxy Configuration

```haproxy
backend app_servers
    balance roundrobin
    option httpchk GET /health
    
    # Old version servers
    server old-app-1 old-app-1:8080 check weight 80
    server old-app-2 old-app-2:8080 check weight 80
    
    # New version servers
    server new-app-1 new-app-1:8080 check weight 20
    server new-app-2 new-app-2:8080 check weight 20

frontend app_frontend
    bind *:80
    default_backend app_servers
    
    # Custom health check endpoint
    acl health_check path_beg /health
    http-request return status 200 content-type text/plain string "OK" if health_check
```

### 2. Circuit Breaker Implementation

```python
class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
        
    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection"""
        
        if self.state == 'OPEN':
            if self._should_attempt_reset():
                self.state = 'HALF_OPEN'
            else:
                raise CircuitBreakerOpenException("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise
    
    def _should_attempt_reset(self):
        return (
            self.last_failure_time and
            time.time() - self.last_failure_time >= self.recovery_timeout
        )
    
    def _on_success(self):
        self.failure_count = 0
        self.state = 'CLOSED'
    
    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'

# Usage with service migration
@CircuitBreaker(failure_threshold=3, recovery_timeout=30)
def call_new_service(request):
    return new_service.process(request)

def handle_request(request):
    try:
        return call_new_service(request)
    except CircuitBreakerOpenException:
        # Fallback to old service
        return old_service.process(request)
```

## Monitoring and Validation

### 1. Health Check Implementation

```python
class HealthChecker:
    def __init__(self):
        self.checks = []
        
    def add_check(self, name, check_func, timeout=5):
        self.checks.append({
            'name': name,
            'func': check_func,
            'timeout': timeout
        })
    
    async def run_checks(self):
        """Run all health checks and return status"""
        results = {}
        overall_status = 'healthy'
        
        for check in self.checks:
            try:
                result = await asyncio.wait_for(
                    check['func'](),
                    timeout=check['timeout']
                )
                results[check['name']] = {
                    'status': 'healthy',
                    'result': result
                }
            except asyncio.TimeoutError:
                results[check['name']] = {
                    'status': 'unhealthy',
                    'error': 'timeout'
                }
                overall_status = 'unhealthy'
            except Exception as e:
                results[check['name']] = {
                    'status': 'unhealthy',
                    'error': str(e)
                }
                overall_status = 'unhealthy'
        
        return {
            'status': overall_status,
            'checks': results,
            'timestamp': datetime.utcnow().isoformat()
        }

# Example health checks
health_checker = HealthChecker()

async def database_check():
    """Check database connectivity"""
    result = await db.execute("SELECT 1")
    return result is not None

async def external_api_check():
    """Check external API availability"""
    response = await http_client.get("https://api.example.com/health")
    return response.status_code == 200

async def memory_check():
    """Check memory usage"""
    memory_usage = psutil.virtual_memory().percent
    if memory_usage > 90:
        raise Exception(f"Memory usage too high: {memory_usage}%")
    return f"Memory usage: {memory_usage}%"

health_checker.add_check("database", database_check)
health_checker.add_check("external_api", external_api_check)
health_checker.add_check("memory", memory_check)
```

### 2. Readiness vs Liveness Probes

```yaml
# Kubernetes Pod with proper health checks
apiVersion: v1
kind: Pod
metadata:
  name: app-pod
spec:
  containers:
  - name: app
    image: myapp:2.0.0
    ports:
    - containerPort: 8080
    
    # Readiness probe - determines if pod should receive traffic
    readinessProbe:
      httpGet:
        path: /ready
        port: 8080
      initialDelaySeconds: 5
      periodSeconds: 3
      timeoutSeconds: 2
      successThreshold: 1
      failureThreshold: 3
    
    # Liveness probe - determines if pod should be restarted
    livenessProbe:
      httpGet:
        path: /live
        port: 8080
      initialDelaySeconds: 30
      periodSeconds: 10
      timeoutSeconds: 5
      successThreshold: 1
      failureThreshold: 3
    
    # Startup probe - gives app time to start before other probes
    startupProbe:
      httpGet:
        path: /startup
        port: 8080
      initialDelaySeconds: 10
      periodSeconds: 5
      timeoutSeconds: 3
      successThreshold: 1
      failureThreshold: 30  # Allow up to 150 seconds for startup
```

### 3. Metrics and Alerting

```python
class MigrationMetrics:
    def __init__(self, prometheus_client):
        self.prometheus = prometheus_client
        
        # Define custom metrics
        self.migration_progress = Counter(
            'migration_progress_total',
            'Total migration operations completed',
            ['operation', 'status']
        )
        
        self.migration_duration = Histogram(
            'migration_operation_duration_seconds',
            'Time spent on migration operations',
            ['operation']
        )
        
        self.system_health = Gauge(
            'system_health_score',
            'Overall system health score (0-1)',
            ['component']
        )
        
        self.traffic_split = Gauge(
            'traffic_split_percentage',
            'Percentage of traffic going to each version',
            ['version']
        )
    
    def record_migration_step(self, operation, status, duration=None):
        """Record completion of a migration step"""
        self.migration_progress.labels(operation=operation, status=status).inc()
        
        if duration:
            self.migration_duration.labels(operation=operation).observe(duration)
    
    def update_health_score(self, component, score):
        """Update health score for a component"""
        self.system_health.labels(component=component).set(score)
    
    def update_traffic_split(self, version_weights):
        """Update traffic split metrics"""
        for version, weight in version_weights.items():
            self.traffic_split.labels(version=version).set(weight)

# Usage in migration
metrics = MigrationMetrics(prometheus_client)

def perform_migration_step(operation):
    start_time = time.time()
    
    try:
        # Perform migration operation
        result = execute_migration_operation(operation)
        
        # Record success
        duration = time.time() - start_time
        metrics.record_migration_step(operation, 'success', duration)
        
        return result
        
    except Exception as e:
        # Record failure
        duration = time.time() - start_time
        metrics.record_migration_step(operation, 'failure', duration)
        raise
```

## Rollback Strategies

### 1. Immediate Rollback Triggers

```python
class AutoRollbackSystem:
    def __init__(self, metrics_client, deployment_client):
        self.metrics = metrics_client
        self.deployment = deployment_client
        self.rollback_triggers = {
            'error_rate_spike': {
                'threshold': 0.05,  # 5% error rate
                'window': 300,      # 5 minutes
                'auto_rollback': True
            },
            'latency_increase': {
                'threshold': 2.0,   # 2x baseline latency
                'window': 600,      # 10 minutes
                'auto_rollback': False  # Manual confirmation required
            },
            'availability_drop': {
                'threshold': 0.95,  # Below 95% availability
                'window': 120,      # 2 minutes
                'auto_rollback': True
            }
        }
    
    async def monitor_and_rollback(self, deployment_name):
        """Monitor deployment and trigger rollback if needed"""
        
        while True:
            for trigger_name, config in self.rollback_triggers.items():
                if await self.check_trigger(trigger_name, config):
                    if config['auto_rollback']:
                        await self.execute_rollback(deployment_name, trigger_name)
                    else:
                        await self.alert_for_manual_rollback(deployment_name, trigger_name)
            
            await asyncio.sleep(30)  # Check every 30 seconds
    
    async def check_trigger(self, trigger_name, config):
        """Check if rollback trigger condition is met"""
        
        current_value = await self.metrics.get_current_value(trigger_name)
        baseline_value = await self.metrics.get_baseline_value(trigger_name)
        
        if trigger_name == 'error_rate_spike':
            return current_value > config['threshold']
        elif trigger_name == 'latency_increase':
            return current_value > baseline_value * config['threshold']
        elif trigger_name == 'availability_drop':
            return current_value < config['threshold']
        
        return False
    
    async def execute_rollback(self, deployment_name, reason):
        """Execute automatic rollback"""
        
        print(f"Executing automatic rollback for {deployment_name}. Reason: {reason}")
        
        # Get previous revision
        previous_revision = await self.deployment.get_previous_revision(deployment_name)
        
        # Perform rollback
        await self.deployment.rollback_to_revision(deployment_name, previous_revision)
        
        # Notify stakeholders
        await self.notify_rollback_executed(deployment_name, reason)
```

### 2. Data Rollback Strategies

```sql
-- Point-in-time recovery setup
-- Create restore point before migration
SELECT pg_create_restore_point('pre_migration_' || to_char(now(), 'YYYYMMDD_HH24MISS'));

-- Rollback using point-in-time recovery
-- (This would be executed on a separate recovery instance)
-- recovery.conf:
-- recovery_target_name = 'pre_migration_20240101_120000'
-- recovery_target_action = 'promote'
```

```python
class DataRollbackManager:
    def __init__(self, database_client, backup_service):
        self.db = database_client
        self.backup = backup_service
    
    async def create_rollback_point(self, migration_id):
        """Create a rollback point before migration"""
        
        rollback_point = {
            'migration_id': migration_id,
            'timestamp': datetime.utcnow(),
            'backup_location': None,
            'schema_snapshot': None
        }
        
        # Create database backup
        backup_path = await self.backup.create_backup(
            f"pre_migration_{migration_id}_{int(time.time())}"
        )
        rollback_point['backup_location'] = backup_path
        
        # Capture schema snapshot
        schema_snapshot = await self.capture_schema_snapshot()
        rollback_point['schema_snapshot'] = schema_snapshot
        
        # Store rollback point metadata
        await self.store_rollback_metadata(rollback_point)
        
        return rollback_point
    
    async def execute_rollback(self, migration_id):
        """Execute data rollback to specified point"""
        
        rollback_point = await self.get_rollback_metadata(migration_id)
        
        if not rollback_point:
            raise Exception(f"No rollback point found for migration {migration_id}")
        
        # Stop application traffic
        await self.stop_application_traffic()
        
        try:
            # Restore from backup
            await self.backup.restore_from_backup(
                rollback_point['backup_location']
            )
            
            # Validate data integrity
            await self.validate_data_integrity(
                rollback_point['schema_snapshot']
            )
            
            # Update application configuration
            await self.update_application_config(rollback_point)
            
            # Resume application traffic
            await self.resume_application_traffic()
            
            print(f"Data rollback completed successfully for migration {migration_id}")
            
        except Exception as e:
            # If rollback fails, we have a serious problem
            await self.escalate_rollback_failure(migration_id, str(e))
            raise
```

## Best Practices Summary

### 1. Pre-Migration Checklist
- [ ] Comprehensive backup strategy in place
- [ ] Rollback procedures tested in staging
- [ ] Monitoring and alerting configured
- [ ] Health checks implemented
- [ ] Feature flags configured
- [ ] Team communication plan established
- [ ] Load balancer configuration prepared
- [ ] Database connection pooling optimized

### 2. During Migration
- [ ] Monitor key metrics continuously
- [ ] Validate each phase before proceeding
- [ ] Maintain detailed logs of all actions
- [ ] Keep stakeholders informed of progress
- [ ] Have rollback trigger ready
- [ ] Monitor user experience metrics
- [ ] Watch for performance degradation
- [ ] Validate data consistency

### 3. Post-Migration
- [ ] Continue monitoring for 24-48 hours
- [ ] Validate all business processes
- [ ] Update documentation
- [ ] Conduct post-migration retrospective
- [ ] Archive migration artifacts
- [ ] Update disaster recovery procedures
- [ ] Plan for legacy system decommissioning

### 4. Common Pitfalls to Avoid
- Don't skip testing rollback procedures
- Don't ignore performance impact
- Don't rush through validation phases
- Don't forget to communicate with stakeholders
- Don't assume health checks are sufficient
- Don't neglect data consistency validation
- Don't underestimate time requirements
- Don't overlook dependency impacts

This comprehensive guide provides the foundation for implementing zero-downtime migrations across various system components while maintaining high availability and data integrity.
```

### scripts/rollback_generator.py

```python
#!/usr/bin/env python3
"""
Rollback Generator - Generate comprehensive rollback procedures for migrations

This tool takes a migration plan and generates detailed rollback procedures for each phase,
including data rollback scripts, service rollback steps, validation checks, and communication
templates to ensure safe and reliable migration reversals.

Author: Migration Architect Skill
Version: 1.0.0
License: MIT
"""

import json
import argparse
import sys
import datetime
import hashlib
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum


class RollbackTrigger(Enum):
    """Types of rollback triggers"""
    MANUAL = "manual"
    AUTOMATED = "automated"
    THRESHOLD_BASED = "threshold_based"
    TIME_BASED = "time_based"


class RollbackUrgency(Enum):
    """Rollback urgency levels"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    EMERGENCY = "emergency"


@dataclass
class RollbackStep:
    """Individual rollback step"""
    step_id: str
    name: str
    description: str
    script_type: str  # sql, bash, api, manual
    script_content: str
    estimated_duration_minutes: int
    dependencies: List[str]
    validation_commands: List[str]
    success_criteria: List[str]
    failure_escalation: str
    rollback_order: int


@dataclass
class RollbackPhase:
    """Rollback phase containing multiple steps"""
    phase_name: str
    description: str
    urgency_level: str
    estimated_duration_minutes: int
    prerequisites: List[str]
    steps: List[RollbackStep]
    validation_checkpoints: List[str]
    communication_requirements: List[str]
    risk_level: str


@dataclass
class RollbackTriggerCondition:
    """Conditions that trigger automatic rollback"""
    trigger_id: str
    name: str
    condition: str
    metric_threshold: Optional[Dict[str, Any]]
    evaluation_window_minutes: int
    auto_execute: bool
    escalation_contacts: List[str]


@dataclass
class DataRecoveryPlan:
    """Data recovery and restoration plan"""
    recovery_method: str  # backup_restore, point_in_time, event_replay
    backup_location: str
    recovery_scripts: List[str]
    data_validation_queries: List[str]
    estimated_recovery_time_minutes: int
    recovery_dependencies: List[str]


@dataclass
class CommunicationTemplate:
    """Communication template for rollback scenarios"""
    template_type: str  # start, progress, completion, escalation
    audience: str  # technical, business, executive, customers
    subject: str
    body: str
    urgency: str
    delivery_methods: List[str]


@dataclass
class RollbackRunbook:
    """Complete rollback runbook"""
    runbook_id: str
    migration_id: str
    created_at: str
    rollback_phases: List[RollbackPhase]
    trigger_conditions: List[RollbackTriggerCondition]
    data_recovery_plan: DataRecoveryPlan
    communication_templates: List[CommunicationTemplate]
    escalation_matrix: Dict[str, Any]
    validation_checklist: List[str]
    post_rollback_procedures: List[str]
    emergency_contacts: List[Dict[str, str]]


class RollbackGenerator:
    """Main rollback generator class"""
    
    def __init__(self):
        self.rollback_templates = self._load_rollback_templates()
        self.validation_templates = self._load_validation_templates()
        self.communication_templates = self._load_communication_templates()
    
    def _load_rollback_templates(self) -> Dict[str, Any]:
        """Load rollback script templates for different migration types"""
        return {
            "database": {
                "schema_rollback": {
                    "drop_table": "DROP TABLE IF EXISTS {table_name};",
                    "drop_column": "ALTER TABLE {table_name} DROP COLUMN IF EXISTS {column_name};",
                    "restore_column": "ALTER TABLE {table_name} ADD COLUMN {column_definition};",
                    "revert_type": "ALTER TABLE {table_name} ALTER COLUMN {column_name} TYPE {original_type};",
                    "drop_constraint": "ALTER TABLE {table_name} DROP CONSTRAINT {constraint_name};",
                    "add_constraint": "ALTER TABLE {table_name} ADD CONSTRAINT {constraint_name} {constraint_definition};"
                },
                "data_rollback": {
                    "restore_backup": "pg_restore -d {database_name} -c {backup_file}",
                    "point_in_time_recovery": "SELECT pg_create_restore_point('pre_migration_{timestamp}');",
                    "delete_migrated_data": "DELETE FROM {table_name} WHERE migration_batch_id = '{batch_id}';",
                    "restore_original_values": "UPDATE {table_name} SET {column_name} = backup_{column_name} WHERE migration_flag = true;"
                }
            },
            "service": {
                "deployment_rollback": {
                    "rollback_blue_green": "kubectl patch service {service_name} -p '{\"spec\":{\"selector\":{\"version\":\"blue\"}}}'",
                    "rollback_canary": "kubectl scale deployment {service_name}-canary --replicas=0",
                    "restore_previous_version": "kubectl rollout undo deployment/{service_name} --to-revision={revision_number}",
                    "update_load_balancer": "aws elbv2 modify-rule --rule-arn {rule_arn} --actions Type=forward,TargetGroupArn={original_target_group}"
                },
                "configuration_rollback": {
                    "restore_config_map": "kubectl apply -f {original_config_file}",
                    "revert_feature_flags": "curl -X PUT {feature_flag_api}/flags/{flag_name} -d '{\"enabled\": false}'",
                    "restore_environment_vars": "kubectl set env deployment/{deployment_name} {env_var_name}={original_value}"
                }
            },
            "infrastructure": {
                "cloud_rollback": {
                    "revert_terraform": "terraform apply -target={resource_name} {rollback_plan_file}",
                    "restore_dns": "aws route53 change-resource-record-sets --hosted-zone-id {zone_id} --change-batch file://{rollback_dns_changes}",
                    "rollback_security_groups": "aws ec2 authorize-security-group-ingress --group-id {group_id} --protocol {protocol} --port {port} --cidr {cidr}",
                    "restore_iam_policies": "aws iam put-role-policy --role-name {role_name} --policy-name {policy_name} --policy-document file://{original_policy}"
                },
                "network_rollback": {
                    "restore_routing": "aws ec2 replace-route --route-table-id {route_table_id} --destination-cidr-block {cidr} --gateway-id {original_gateway}",
                    "revert_load_balancer": "aws elbv2 modify-load-balancer --load-balancer-arn {lb_arn} --scheme {original_scheme}",
                    "restore_firewall_rules": "aws ec2 revoke-security-group-ingress --group-id {group_id} --protocol {protocol} --port {port} --source-group {source_group}"
                }
            }
        }
    
    def _load_validation_templates(self) -> Dict[str, List[str]]:
        """Load validation command templates"""
        return {
            "database": [
                "SELECT COUNT(*) FROM {table_name};",
                "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}';",
                "SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}' AND column_name = '{column_name}';",
                "SELECT COUNT(DISTINCT {primary_key}) FROM {table_name};",
                "SELECT MAX({timestamp_column}) FROM {table_name};"
            ],
            "service": [
                "curl -f {health_check_url}",
                "kubectl get pods -l app={service_name} --field-selector=status.phase=Running",
                "kubectl logs deployment/{service_name} --tail=100 | grep -i error",
                "curl -f {service_endpoint}/api/v1/status"
            ],
            "infrastructure": [
                "aws ec2 describe-instances --instance-ids {instance_id} --query 'Reservations[*].Instances[*].State.Name'",
                "nslookup {domain_name}",
                "curl -I {load_balancer_url}",
                "aws elbv2 describe-target-health --target-group-arn {target_group_arn}"
            ]
        }
    
    def _load_communication_templates(self) -> Dict[str, Dict[str, str]]:
        """Load communication templates"""
        return {
            "rollback_start": {
                "technical": {
                    "subject": "ROLLBACK INITIATED: {migration_name}",
                    "body": """Team,

We have initiated rollback for migration: {migration_name}
Rollback ID: {rollback_id}
Start Time: {start_time}
Estimated Duration: {estimated_duration}

Reason: {rollback_reason}

Current Status: Rolling back phase {current_phase}

Next Updates: Every 15 minutes or upon phase completion

Actions Required:
- Monitor system health dashboards
- Stand by for escalation if needed
- Do not make manual changes during rollback

Incident Commander: {incident_commander}
"""
                },
                "business": {
                    "subject": "System Rollback In Progress - {system_name}",
                    "body": """Business Stakeholders,

We are currently performing a planned rollback of the {system_name} migration due to {rollback_reason}.

Impact: {business_impact}
Expected Resolution: {estimated_completion_time}
Affected Services: {affected_services}

We will provide updates every 30 minutes.

Contact: {business_contact}
"""
                },
                "executive": {
                    "subject": "EXEC ALERT: Critical System Rollback - {system_name}",
                    "body": """Executive Team,

A critical rollback is in progress for {system_name}.

Summary:
- Rollback Reason: {rollback_reason}
- Business Impact: {business_impact}
- Expected Resolution: {estimated_completion_time}
- Customer Impact: {customer_impact}

We are following established procedures and will update hourly.

Escalation: {escalation_contact}
"""
                }
            },
            "rollback_complete": {
                "technical": {
                    "subject": "ROLLBACK COMPLETED: {migration_name}",
                    "body": """Team,

Rollback has been successfully completed for migration: {migration_name}

Summary:
- Start Time: {start_time}
- End Time: {end_time}
- Duration: {actual_duration}
- Phases Completed: {completed_phases}

Validation Results:
{validation_results}

System Status: {system_status}

Next Steps:
- Continue monitoring for 24 hours
- Post-rollback review scheduled for {review_date}
- Root cause analysis to begin

All clear to resume normal operations.

Incident Commander: {incident_commander}
"""
                }
            }
        }
    
    def generate_rollback_runbook(self, migration_plan: Dict[str, Any]) -> RollbackRunbook:
        """Generate comprehensive rollback runbook from migration plan"""
        runbook_id = f"rb_{hashlib.md5(str(migration_plan).encode()).hexdigest()[:8]}"
        migration_id = migration_plan.get("migration_id", "unknown")
        migration_type = migration_plan.get("migration_type", "unknown")
        
        # Generate rollback phases (reverse order of migration phases)
        rollback_phases = self._generate_rollback_phases(migration_plan)
        
        # Generate trigger conditions
        trigger_conditions = self._generate_trigger_conditions(migration_plan)
        
        # Generate data recovery plan
        data_recovery_plan = self._generate_data_recovery_plan(migration_plan)
        
        # Generate communication templates
        communication_templates = self._generate_communication_templates(migration_plan)
        
        # Generate escalation matrix
        escalation_matrix = self._generate_escalation_matrix(migration_plan)
        
        # Generate validation checklist
        validation_checklist = self._generate_validation_checklist(migration_plan)
        
        # Generate post-rollback procedures
        post_rollback_procedures = self._generate_post_rollback_procedures(migration_plan)
        
        # Generate emergency contacts
        emergency_contacts = self._generate_emergency_contacts(migration_plan)
        
        return RollbackRunbook(
            runbook_id=runbook_id,
            migration_id=migration_id,
            created_at=datetime.datetime.now().isoformat(),
            rollback_phases=rollback_phases,
            trigger_conditions=trigger_conditions,
            data_recovery_plan=data_recovery_plan,
            communication_templates=communication_templates,
            escalation_matrix=escalation_matrix,
            validation_checklist=validation_checklist,
            post_rollback_procedures=post_rollback_procedures,
            emergency_contacts=emergency_contacts
        )
    
    def _generate_rollback_phases(self, migration_plan: Dict[str, Any]) -> List[RollbackPhase]:
        """Generate rollback phases from migration plan"""
        migration_phases = migration_plan.get("phases", [])
        migration_type = migration_plan.get("migration_type", "unknown")
        rollback_phases = []
        
        # Reverse the order of migration phases for rollback
        for i, phase in enumerate(reversed(migration_phases)):
            if isinstance(phase, dict):
                phase_name = phase.get("name", f"phase_{i}")
                phase_duration = phase.get("duration_hours", 2) * 60  # Convert to minutes
                phase_risk = phase.get("risk_level", "medium")
            else:
                phase_name = str(phase)
                phase_duration = 120  # Default 2 hours
                phase_risk = "medium"
            
            rollback_steps = self._generate_rollback_steps(phase_name, migration_type, i)
            
            rollback_phase = RollbackPhase(
                phase_name=f"rollback_{phase_name}",
                description=f"Rollback changes made during {phase_name} phase",
                urgency_level=self._calculate_urgency(phase_risk),
                estimated_duration_minutes=phase_duration // 2,  # Rollback typically faster
                prerequisites=self._get_rollback_prerequisites(phase_name, i),
                steps=rollback_steps,
                validation_checkpoints=self._get_validation_checkpoints(phase_name, migration_type),
                communication_requirements=self._get_communication_requirements(phase_name, phase_risk),
                risk_level=phase_risk
            )
            
            rollback_phases.append(rollback_phase)
        
        return rollback_phases
    
    def _generate_rollback_steps(self, phase_name: str, migration_type: str, phase_index: int) -> List[RollbackStep]:
        """Generate specific rollback steps for a phase"""
        steps = []
        templates = self.rollback_templates.get(migration_type, {})
        
        if migration_type == "database":
            if "migration" in phase_name.lower() or "cutover" in phase_name.lower():
                # Data rollback steps
                steps.extend([
                    RollbackStep(
                        step_id=f"rb_data_{phase_index}_01",
                        name="Stop data migration processes",
                        description="Halt all ongoing data migration processes",
                        script_type="sql",
                        script_content="-- Stop migration processes\nSELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE query LIKE '%migration%';",
                        estimated_duration_minutes=5,
                        dependencies=[],
                        validation_commands=["SELECT COUNT(*) FROM pg_stat_activity WHERE query LIKE '%migration%';"],
                        success_criteria=["No active migration processes"],
                        failure_escalation="Contact DBA immediately",
                        rollback_order=1
                    ),
                    RollbackStep(
                        step_id=f"rb_data_{phase_index}_02",
                        name="Restore from backup",
                        description="Restore database from pre-migration backup",
                        script_type="bash",
                        script_content=templates.get("data_rollback", {}).get("restore_backup", "pg_restore -d {database_name} -c {backup_file}"),
                        estimated_duration_minutes=30,
                        dependencies=[f"rb_data_{phase_index}_01"],
                        validation_commands=["SELECT COUNT(*) FROM information_schema.tables;"],
                        success_criteria=["Database restored successfully", "All expected tables present"],
                        failure_escalation="Escalate to senior DBA and infrastructure team",
                        rollback_order=2
                    )
                ])
            
            if "preparation" in phase_name.lower():
                # Schema rollback steps
                steps.append(
                    RollbackStep(
                        step_id=f"rb_schema_{phase_index}_01",
                        name="Drop migration artifacts",
                        description="Remove temporary migration tables and procedures",
                        script_type="sql",
                        script_content="-- Drop migration artifacts\nDROP TABLE IF EXISTS migration_log;\nDROP PROCEDURE IF EXISTS migrate_data();",
                        estimated_duration_minutes=5,
                        dependencies=[],
                        validation_commands=["SELECT COUNT(*) FROM information_schema.tables WHERE table_name LIKE '%migration%';"],
                        success_criteria=["No migration artifacts remain"],
                        failure_escalation="Manual cleanup required",
                        rollback_order=1
                    )
                )
        
        elif migration_type == "service":
            if "cutover" in phase_name.lower():
                # Service rollback steps
                steps.extend([
                    RollbackStep(
                        step_id=f"rb_service_{phase_index}_01",
                        name="Redirect traffic back to old service",
                        description="Update load balancer to route traffic back to previous service version",
                        script_type="bash",
                        script_content=templates.get("deployment_rollback", {}).get("update_load_balancer", "aws elbv2 modify-rule --rule-arn {rule_arn} --actions Type=forward,TargetGroupArn={original_target_group}"),
                        estimated_duration_minutes=2,
                        dependencies=[],
                        validation_commands=["curl -f {health_check_url}"],
                        success_criteria=["Traffic routing to original service", "Health checks passing"],
                        failure_escalation="Emergency procedure - manual traffic routing",
                        rollback_order=1
                    ),
                    RollbackStep(
                        step_id=f"rb_service_{phase_index}_02",
                        name="Rollback service deployment",
                        description="Revert to previous service deployment version",
                        script_type="bash",
                        script_content=templates.get("deployment_rollback", {}).get("restore_previous_version", "kubectl rollout undo deployment/{service_name} --to-revision={revision_number}"),
                        estimated_duration_minutes=10,
                        dependencies=[f"rb_service_{phase_index}_01"],
                        validation_commands=["kubectl get pods -l app={service_name} --field-selector=status.phase=Running"],
                        success_criteria=["Previous version deployed", "All pods running"],
                        failure_escalation="Manual pod management required",
                        rollback_order=2
                    )
                ])
        
        elif migration_type == "infrastructure":
            steps.extend([
                RollbackStep(
                    step_id=f"rb_infra_{phase_index}_01",
                    name="Revert infrastructure changes",
                    description="Apply terraform plan to revert infrastructure to previous state",
                    script_type="bash",
                    script_content=templates.get("cloud_rollback", {}).get("revert_terraform", "terraform apply -target={resource_name} {rollback_plan_file}"),
                    estimated_duration_minutes=15,
                    dependencies=[],
                    validation_commands=["terraform plan -detailed-exitcode"],
                    success_criteria=["Infrastructure matches previous state", "No planned changes"],
                    failure_escalation="Manual infrastructure review required",
                    rollback_order=1
                ),
                RollbackStep(
                    step_id=f"rb_infra_{phase_index}_02",
                    name="Restore DNS configuration",
                    description="Revert DNS changes to point back to original infrastructure",
                    script_type="bash",
                    script_content=templates.get("cloud_rollback", {}).get("restore_dns", "aws route53 change-resource-record-sets --hosted-zone-id {zone_id} --change-batch file://{rollback_dns_changes}"),
                    estimated_duration_minutes=10,
                    dependencies=[f"rb_infra_{phase_index}_01"],
                    validation_commands=["nslookup {domain_name}"],
                    success_criteria=["DNS resolves to original endpoints"],
                    failure_escalation="Contact DNS administrator",
                    rollback_order=2
                )
            ])
        
        # Add generic validation step for all migration types
        steps.append(
            RollbackStep(
                step_id=f"rb_validate_{phase_index}_final",
                name="Validate rollback completion",
                description=f"Comprehensive validation that {phase_name} rollback completed successfully",
                script_type="manual",
                script_content="Execute validation checklist for this phase",
                estimated_duration_minutes=10,
                dependencies=[step.step_id for step in steps],
                validation_commands=self.validation_templates.get(migration_type, []),
                success_criteria=[f"{phase_name} fully rolled back", "All validation checks pass"],
                failure_escalation=f"Investigate {phase_name} rollback failures",
                rollback_order=99
            )
        )
        
        return steps
    
    def _generate_trigger_conditions(self, migration_plan: Dict[str, Any]) -> List[RollbackTriggerCondition]:
        """Generate automatic rollback trigger conditions"""
        triggers = []
        migration_type = migration_plan.get("migration_type", "unknown")
        
        # Generic triggers for all migration types
        triggers.extend([
            RollbackTriggerCondition(
                trigger_id="error_rate_spike",
                name="Error Rate Spike",
                condition="error_rate > baseline * 5 for 5 minutes",
                metric_threshold={
                    "metric": "error_rate",
                    "operator": "greater_than",
                    "value": "baseline_error_rate * 5",
                    "duration_minutes": 5
                },
                evaluation_window_minutes=5,
                auto_execute=True,
                escalation_contacts=["on_call_engineer", "migration_lead"]
            ),
            RollbackTriggerCondition(
                trigger_id="response_time_degradation",
                name="Response Time Degradation",
                condition="p95_response_time > baseline * 3 for 10 minutes",
                metric_threshold={
                    "metric": "p95_response_time",
                    "operator": "greater_than",
                    "value": "baseline_p95 * 3",
                    "duration_minutes": 10
                },
                evaluation_window_minutes=10,
                auto_execute=False,
                escalation_contacts=["performance_team", "migration_lead"]
            ),
            RollbackTriggerCondition(
                trigger_id="availability_drop",
                name="Service Availability Drop",
                condition="availability < 95% for 2 minutes",
                metric_threshold={
                    "metric": "availability",
                    "operator": "less_than",
                    "value": 0.95,
                    "duration_minutes": 2
                },
                evaluation_window_minutes=2,
                auto_execute=True,
                escalation_contacts=["sre_team", "incident_commander"]
            )
        ])
        
        # Migration-type specific triggers
        if migration_type == "database":
            triggers.extend([
                RollbackTriggerCondition(
                    trigger_id="data_integrity_failure",
                    name="Data Integrity Check Failure",
                    condition="data_validation_failures > 0",
                    metric_threshold={
                        "metric": "data_validation_failures",
                        "operator": "greater_than",
                        "value": 0,
                        "duration_minutes": 1
                    },
                    evaluation_window_minutes=1,
                    auto_execute=True,
                    escalation_contacts=["dba_team", "data_team"]
                ),
                RollbackTriggerCondition(
                    trigger_id="migration_progress_stalled",
                    name="Migration Progress Stalled",
                    condition="migration_progress unchanged for 30 minutes",
                    metric_threshold={
                        "metric": "migration_progress_rate",
                        "operator": "equals",
                        "value": 0,
                        "duration_minutes": 30
                    },
                    evaluation_window_minutes=30,
                    auto_execute=False,
                    escalation_contacts=["migration_team", "dba_team"]
                )
            ])
        
        elif migration_type == "service":
            triggers.extend([
                RollbackTriggerCondition(
                    trigger_id="cpu_utilization_spike",
                    name="CPU Utilization Spike",
                    condition="cpu_utilization > 90% for 15 minutes",
                    metric_threshold={
                        "metric": "cpu_utilization",
                        "operator": "greater_than",
                        "value": 0.90,
                        "duration_minutes": 15
                    },
                    evaluation_window_minutes=15,
                    auto_execute=False,
                    escalation_contacts=["devops_team", "infrastructure_team"]
                ),
                RollbackTriggerCondition(
                    trigger_id="memory_leak_detected",
                    name="Memory Leak Detected",
                    condition="memory_usage increasing continuously for 20 minutes",
                    metric_threshold={
                        "metric": "memory_growth_rate",
                        "operator": "greater_than",
                        "value": "1MB/minute",
                        "duration_minutes": 20
                    },
                    evaluation_window_minutes=20,
                    auto_execute=True,
                    escalation_contacts=["development_team", "sre_team"]
                )
            ])
        
        return triggers
    
    def _generate_data_recovery_plan(self, migration_plan: Dict[str, Any]) -> DataRecoveryPlan:
        """Generate data recovery plan"""
        migration_type = migration_plan.get("migration_type", "unknown")
        
        if migration_type == "database":
            return DataRecoveryPlan(
                recovery_method="point_in_time",
                backup_location="/backups/pre_migration_{migration_id}_{timestamp}.sql",
                recovery_scripts=[
                    "pg_restore -d production -c /backups/pre_migration_backup.sql",
                    "SELECT pg_create_restore_point('rollback_point');",
                    "VACUUM ANALYZE; -- Refresh statistics after restore"
                ],
                data_validation_queries=[
                    "SELECT COUNT(*) FROM critical_business_table;",
                    "SELECT MAX(created_at) FROM audit_log;",
                    "SELECT COUNT(DISTINCT user_id) FROM user_sessions;",
                    "SELECT SUM(amount) FROM financial_transactions WHERE date = CURRENT_DATE;"
                ],
                estimated_recovery_time_minutes=45,
                recovery_dependencies=["database_instance_running", "backup_file_accessible"]
            )
        else:
            return DataRecoveryPlan(
                recovery_method="backup_restore",
                backup_location="/backups/pre_migration_state",
                recovery_scripts=[
                    "# Restore configuration files from backup",
                    "cp -r /backups/pre_migration_state/config/* /app/config/",
                    "# Restart services with previous configuration",
                    "systemctl restart application_service"
                ],
                data_validation_queries=[
                    "curl -f http://localhost:8080/health",
                    "curl -f http://localhost:8080/api/status"
                ],
                estimated_recovery_time_minutes=20,
                recovery_dependencies=["service_stopped", "backup_accessible"]
            )
    
    def _generate_communication_templates(self, migration_plan: Dict[str, Any]) -> List[CommunicationTemplate]:
        """Generate communication templates for rollback scenarios"""
        templates = []
        base_templates = self.communication_templates
        
        # Rollback start notifications
        for audience in ["technical", "business", "executive"]:
            if audience in base_templates["rollback_start"]:
                template_data = base_templates["rollback_start"][audience]
                templates.append(CommunicationTemplate(
                    template_type="rollback_start",
                    audience=audience,
                    subject=template_data["subject"],
                    body=template_data["body"],
                    urgency="high" if audience == "executive" else "medium",
                    delivery_methods=["email", "slack"] if audience == "technical" else ["email"]
                ))
        
        # Rollback completion notifications
        for audience in ["technical", "business"]:
            if audience in base_templates.get("rollback_complete", {}):
                template_data = base_templates["rollback_complete"][audience]
                templates.append(CommunicationTemplate(
                    template_type="rollback_complete",
                    audience=audience,
                    subject=template_data["subject"],
                    body=template_data["body"],
                    urgency="medium",
                    delivery_methods=["email", "slack"] if audience == "technical" else ["email"]
                ))
        
        # Emergency escalation template
        templates.append(CommunicationTemplate(
            template_type="emergency_escalation",
            audience="executive",
            subject="CRITICAL: Rollback Emergency - {migration_name}",
            body="""CRITICAL SITUATION - IMMEDIATE ATTENTION REQUIRED

Migration: {migration_name}
Issue: Rollback procedure has encountered critical failures

Current Status: {current_status}
Failed Components: {failed_components}
Business Impact: {business_impact}
Customer Impact: {customer_impact}

Immediate Actions:
1. Emergency response team activated
2. {emergency_action_1}
3. {emergency_action_2}

War Room: {war_room_location}
Bridge Line: {conference_bridge}

Next Update: {next_update_time}

Incident Commander: {incident_commander}
Executive On-Call: {executive_on_call}
""",
            urgency="emergency",
            delivery_methods=["email", "sms", "phone_call"]
        ))
        
        return templates
    
    def _generate_escalation_matrix(self, migration_plan: Dict[str, Any]) -> Dict[str, Any]:
        """Generate escalation matrix for different failure scenarios"""
        return {
            "level_1": {
                "trigger": "Single component failure",
                "response_time_minutes": 5,
                "contacts": ["on_call_engineer", "migration_lead"],
                "actions": ["Investigate issue", "Attempt automated remediation", "Monitor closely"]
            },
            "level_2": {
                "trigger": "Multiple component failures or single critical failure",
                "response_time_minutes": 2,
                "contacts": ["senior_engineer", "team_lead", "devops_lead"],
                "actions": ["Initiate rollback", "Establish war room", "Notify stakeholders"]
            },
            "level_3": {
                "trigger": "System-wide failure or data corruption",
                "response_time_minutes": 1,
                "contacts": ["engineering_manager", "cto", "incident_commander"],
                "actions": ["Emergency rollback", "All hands on deck", "Executive notification"]
            },
            "emergency": {
                "trigger": "Business-critical failure with customer impact",
                "response_time_minutes": 0,
                "contacts": ["ceo", "cto", "head_of_operations"],
                "actions": ["Emergency procedures", "Customer communication", "Media preparation if needed"]
            }
        }
    
    def _generate_validation_checklist(self, migration_plan: Dict[str, Any]) -> List[str]:
        """Generate comprehensive validation checklist"""
        migration_type = migration_plan.get("migration_type", "unknown")
        
        base_checklist = [
            "Verify system is responding to health checks",
            "Confirm error rates are within normal parameters",
            "Validate response times meet SLA requirements",
            "Check all critical business processes are functioning",
            "Verify monitoring and alerting systems are operational",
            "Confirm no data corruption has occurred",
            "Validate security controls are functioning properly",
            "Check backup systems are working correctly",
            "Verify integration points with downstream systems",
            "Confirm user authentication and authorization working"
        ]
        
        if migration_type == "database":
            base_checklist.extend([
                "Validate database schema matches expected state",
                "Confirm referential integrity constraints",
                "Check database performance metrics",
                "Verify data consistency across related tables",
                "Validate indexes and statistics are optimal",
                "Confirm transaction logs are clean",
                "Check database connections and connection pooling"
            ])
        
        elif migration_type == "service":
            base_checklist.extend([
                "Verify service discovery is working correctly",
                "Confirm load balancing is distributing traffic properly",
                "Check service-to-service communication",
                "Validate API endpoints are responding correctly",
                "Confirm feature flags are in correct state",
                "Check resource utilization (CPU, memory, disk)",
                "Verify container orchestration is healthy"
            ])
        
        elif migration_type == "infrastructure":
            base_checklist.extend([
                "Verify network connectivity between components",
                "Confirm DNS resolution is working correctly",
                "Check firewall rules and security groups",
                "Validate load balancer configuration",
                "Confirm SSL/TLS certificates are valid",
                "Check storage systems are accessible",
                "Verify backup and disaster recovery systems"
            ])
        
        return base_checklist
    
    def _generate_post_rollback_procedures(self, migration_plan: Dict[str, Any]) -> List[str]:
        """Generate post-rollback procedures"""
        return [
            "Monitor system stability for 24-48 hours post-rollback",
            "Conduct thorough post-rollback testing of all critical paths",
            "Review and analyze rollback metrics and timing",
            "Document lessons learned and rollback procedure improvements",
            "Schedule post-mortem meeting with all stakeholders",
            "Update rollback procedures based on actual experience",
            "Communicate rollback completion to all stakeholders",
            "Archive rollback logs and artifacts for future reference",
            "Review and update monitoring thresholds if needed",
            "Plan for next migration attempt with improved procedures",
            "Conduct security review to ensure no vulnerabilities introduced",
            "Update disaster recovery procedures if affected by rollback",
            "Review capacity planning based on rollback resource usage",
            "Update documentation with rollback experience and timings"
        ]
    
    def _generate_emergency_contacts(self, migration_plan: Dict[str, Any]) -> List[Dict[str, str]]:
        """Generate emergency contact list"""
        return [
            {
                "role": "Incident Commander",
                "name": "TBD - Assigned during migration",
                "primary_phone": "+1-XXX-XXX-XXXX",
                "email": "[email protected]",
                "backup_contact": "[email protected]"
            },
            {
                "role": "Technical Lead",
                "name": "TBD - Migration technical owner",
                "primary_phone": "+1-XXX-XXX-XXXX",
                "email": "[email protected]",
                "backup_contact": "[email protected]"
            },
            {
                "role": "Business Owner",
                "name": "TBD - Business stakeholder",
                "primary_phone": "+1-XXX-XXX-XXXX",
                "email": "[email protected]",
                "backup_contact": "[email protected]"
            },
            {
                "role": "On-Call Engineer",
                "name": "Current on-call rotation",
                "primary_phone": "+1-XXX-XXX-XXXX",
                "email": "[email protected]",
                "backup_contact": "[email protected]"
            },
            {
                "role": "Executive Escalation",
                "name": "CTO/VP Engineering",
                "primary_phone": "+1-XXX-XXX-XXXX",
                "email": "[email protected]",
                "backup_contact": "[email protected]"
            }
        ]
    
    def _calculate_urgency(self, risk_level: str) -> str:
        """Calculate rollback urgency based on risk level"""
        risk_to_urgency = {
            "low": "low",
            "medium": "medium", 
            "high": "high",
            "critical": "emergency"
        }
        return risk_to_urgency.get(risk_level, "medium")
    
    def _get_rollback_prerequisites(self, phase_name: str, phase_index: int) -> List[str]:
        """Get prerequisites for rollback phase"""
        prerequisites = [
            "Incident commander assigned and briefed",
            "All team members notified of rollback initiation",
            "Monitoring systems confirmed operational",
            "Backup systems verified and accessible"
        ]
        
        if phase_index > 0:
            prerequisites.append("Previous rollback phase completed successfully")
        
        if "cutover" in phase_name.lower():
            prerequisites.extend([
                "Traffic redirection capabilities confirmed",
                "Load balancer configuration backed up",
                "DNS changes prepared for quick execution"
            ])
        
        if "data" in phase_name.lower() or "migration" in phase_name.lower():
            prerequisites.extend([
                "Database backup verified and accessible",
                "Data validation queries prepared",
                "Database administrator on standby"
            ])
        
        return prerequisites
    
    def _get_validation_checkpoints(self, phase_name: str, migration_type: str) -> List[str]:
        """Get validation checkpoints for rollback phase"""
        checkpoints = [
            f"{phase_name} rollback steps completed",
            "System health checks passing",
            "No critical errors in logs",
            "Key metrics within acceptable ranges"
        ]
        
        validation_commands = self.validation_templates.get(migration_type, [])
        checkpoints.extend([f"Validation command passed: {cmd[:50]}..." for cmd in validation_commands[:3]])
        
        return checkpoints
    
    def _get_communication_requirements(self, phase_name: str, risk_level: str) -> List[str]:
        """Get communication requirements for rollback phase"""
        base_requirements = [
            "Notify incident commander of phase start/completion",
            "Update rollback status dashboard",
            "Log all actions and decisions"
        ]
        
        if risk_level in ["high", "critical"]:
            base_requirements.extend([
                "Notify all stakeholders of phase progress",
                "Update executive team if rollback extends beyond expected time",
                "Prepare customer communication if needed"
            ])
        
        if "cutover" in phase_name.lower():
            base_requirements.append("Immediate notification when traffic is redirected")
        
        return base_requirements
    
    def generate_human_readable_runbook(self, runbook: RollbackRunbook) -> str:
        """Generate human-readable rollback runbook"""
        output = []
        output.append("=" * 80)
        output.append(f"ROLLBACK RUNBOOK: {runbook.runbook_id}")
        output.append("=" * 80)
        output.append(f"Migration ID: {runbook.migration_id}")
        output.append(f"Created: {runbook.created_at}")
        output.append("")
        
        # Emergency Contacts
        output.append("EMERGENCY CONTACTS")
        output.append("-" * 40)
        for contact in runbook.emergency_contacts:
            output.append(f"{contact['role']}: {contact['name']}")
            output.append(f"  Phone: {contact['primary_phone']}")
            output.append(f"  Email: {contact['email']}")
            output.append(f"  Backup: {contact['backup_contact']}")
            output.append("")
        
        # Escalation Matrix
        output.append("ESCALATION MATRIX")
        output.append("-" * 40)
        for level, details in runbook.escalation_matrix.items():
            output.append(f"{level.upper()}:")
            output.append(f"  Trigger: {details['trigger']}")
            output.append(f"  Response Time: {details['response_time_minutes']} minutes")
            output.append(f"  Contacts: {', '.join(details['contacts'])}")
            output.append(f"  Actions: {', '.join(details['actions'])}")
            output.append("")
        
        # Rollback Trigger Conditions
        output.append("AUTOMATIC ROLLBACK TRIGGERS")
        output.append("-" * 40)
        for trigger in runbook.trigger_conditions:
            output.append(f"• {trigger.name}")
            output.append(f"  Condition: {trigger.condition}")
            output.append(f"  Auto-Execute: {'Yes' if trigger.auto_execute else 'No'}")
            output.append(f"  Evaluation Window: {trigger.evaluation_window_minutes} minutes")
            output.append(f"  Contacts: {', '.join(trigger.escalation_contacts)}")
            output.append("")
        
        # Rollback Phases
        output.append("ROLLBACK PHASES")
        output.append("-" * 40)
        for i, phase in enumerate(runbook.rollback_phases, 1):
            output.append(f"{i}. {phase.phase_name.upper()}")
            output.append(f"   Description: {phase.description}")
            output.append(f"   Urgency: {phase.urgency_level.upper()}")
            output.append(f"   Duration: {phase.estimated_duration_minutes} minutes")
            output.append(f"   Risk Level: {phase.risk_level.upper()}")
            
            if phase.prerequisites:
                output.append("   Prerequisites:")
                for prereq in phase.prerequisites:
                    output.append(f"     ✓ {prereq}")
            
            output.append("   Steps:")
            for step in sorted(phase.steps, key=lambda x: x.rollback_order):
                output.append(f"     {step.rollback_order}. {step.name}")
                output.append(f"        Duration: {step.estimated_duration_minutes} min")
                output.append(f"        Type: {step.script_type}")
                if step.script_content and step.script_type != "manual":
                    output.append("        Script:")
                    for line in step.script_content.split('\n')[:3]:  # Show first 3 lines
                        output.append(f"          {line}")
                    if len(step.script_content.split('\n')) > 3:
                        output.append("          ...")
                output.append(f"        Success Criteria: {', '.join(step.success_criteria)}")
                output.append("")
            
            if phase.validation_checkpoints:
                output.append("   Validation Checkpoints:")
                for checkpoint in phase.validation_checkpoints:
                    output.append(f"     ☐ {checkpoint}")
            output.append("")
        
        # Data Recovery Plan
        output.append("DATA RECOVERY PLAN")
        output.append("-" * 40)
        drp = runbook.data_recovery_plan
        output.append(f"Recovery Method: {drp.recovery_method}")
        output.append(f"Backup Location: {drp.backup_location}")
        output.append(f"Estimated Recovery Time: {drp.estimated_recovery_time_minutes} minutes")
        output.append("Recovery Scripts:")
        for script in drp.recovery_scripts:
            output.append(f"  • {script}")
        output.append("Validation Queries:")
        for query in drp.data_validation_queries:
            output.append(f"  • {query}")
        output.append("")
        
        # Validation Checklist
        output.append("POST-ROLLBACK VALIDATION CHECKLIST")
        output.append("-" * 40)
        for i, item in enumerate(runbook.validation_checklist, 1):
            output.append(f"{i:2d}. ☐ {item}")
        output.append("")
        
        # Post-Rollback Procedures
        output.append("POST-ROLLBACK PROCEDURES")
        output.append("-" * 40)
        for i, procedure in enumerate(runbook.post_rollback_procedures, 1):
            output.append(f"{i:2d}. {procedure}")
        output.append("")
        
        return "\n".join(output)


def main():
    """Main function with command line interface"""
    parser = argparse.ArgumentParser(description="Generate comprehensive rollback runbooks from migration plans")
    parser.add_argument("--input", "-i", required=True, help="Input migration plan file (JSON)")
    parser.add_argument("--output", "-o", help="Output file for rollback runbook (JSON)")
    parser.add_argument("--format", "-f", choices=["json", "text", "both"], default="both", help="Output format")
    
    args = parser.parse_args()
    
    try:
        # Load migration plan
        with open(args.input, 'r') as f:
            migration_plan = json.load(f)
        
        # Validate required fields
        if "migration_id" not in migration_plan and "source" not in migration_plan:
            print("Error: Migration plan must contain migration_id or source field", file=sys.stderr)
            return 1
        
        # Generate rollback runbook
        generator = RollbackGenerator()
        runbook = generator.generate_rollback_runbook(migration_plan)
        
        # Output results
        if args.format in ["json", "both"]:
            runbook_dict = asdict(runbook)
            if args.output:
                with open(args.output, 'w') as f:
                    json.dump(runbook_dict, f, indent=2)
                print(f"Rollback runbook saved to {args.output}")
            else:
                print(json.dumps(runbook_dict, indent=2))
        
        if args.format in ["text", "both"]:
            human_runbook = generator.generate_human_readable_runbook(runbook)
            text_output = args.output.replace('.json', '.txt') if args.output else None
            if text_output:
                with open(text_output, 'w') as f:
                    f.write(human_runbook)
                print(f"Human-readable runbook saved to {text_output}")
            else:
                print("\n" + "="*80)
                print("HUMAN-READABLE ROLLBACK RUNBOOK")
                print("="*80)
                print(human_runbook)
        
    except FileNotFoundError:
        print(f"Error: Input file '{args.input}' not found", file=sys.stderr)
        return 1
    except json.JSONDecodeError as e:
        print(f"Error: Invalid JSON in input file: {e}", file=sys.stderr)
        return 1
    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        return 1
    
    return 0


if __name__ == "__main__":
    sys.exit(main())
```

migration-architect | SkillHub