Back to skills
SkillHub ClubAnalyze Data & AIFull StackData / AI

senior-data-engineer

Data engineering skill for building scalable data pipelines, ETL/ELT systems, and data infrastructure. Expertise in Python, SQL, Spark, Airflow, dbt, Kafka, and modern data stack. Includes data modeling, pipeline orchestration, data quality, and DataOps. Use when designing data architectures, building data pipelines, optimizing data workflows, implementing data governance, or troubleshooting data issues.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
3,132
Hot score
99
Updated
March 20, 2026
Overall rating
C4.0
Composite score
4.0
Best-practice grade
B75.6

Install command

npx @skill-hub/cli install openclaw-skills-senior-data-engineer

Repository

openclaw/skills

Skill path: skills/alirezarezvani/senior-data-engineer

Data engineering skill for building scalable data pipelines, ETL/ELT systems, and data infrastructure. Expertise in Python, SQL, Spark, Airflow, dbt, Kafka, and modern data stack. Includes data modeling, pipeline orchestration, data quality, and DataOps. Use when designing data architectures, building data pipelines, optimizing data workflows, implementing data governance, or troubleshooting data issues.

Open repository

Best for

Primary workflow: Analyze Data & AI.

Technical facets: Full Stack, Data / AI.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: openclaw.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install senior-data-engineer into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/openclaw/skills before adding senior-data-engineer to shared team environments
  • Use senior-data-engineer for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: "senior-data-engineer"
description: Data engineering skill for building scalable data pipelines, ETL/ELT systems, and data infrastructure. Expertise in Python, SQL, Spark, Airflow, dbt, Kafka, and modern data stack. Includes data modeling, pipeline orchestration, data quality, and DataOps. Use when designing data architectures, building data pipelines, optimizing data workflows, implementing data governance, or troubleshooting data issues.
---

# Senior Data Engineer

Production-grade data engineering skill for building scalable, reliable data systems.

## Table of Contents

1. [Trigger Phrases](#trigger-phrases)
2. [Quick Start](#quick-start)
3. [Workflows](#workflows)
   - [Building a Batch ETL Pipeline](#workflow-1-building-a-batch-etl-pipeline)
   - [Implementing Real-Time Streaming](#workflow-2-implementing-real-time-streaming)
   - [Data Quality Framework Setup](#workflow-3-data-quality-framework-setup)
4. [Architecture Decision Framework](#architecture-decision-framework)
5. [Tech Stack](#tech-stack)
6. [Reference Documentation](#reference-documentation)
7. [Troubleshooting](#troubleshooting)

---

## Trigger Phrases

Activate this skill when you see:

**Pipeline Design:**
- "Design a data pipeline for..."
- "Build an ETL/ELT process..."
- "How should I ingest data from..."
- "Set up data extraction from..."

**Architecture:**
- "Should I use batch or streaming?"
- "Lambda vs Kappa architecture"
- "How to handle late-arriving data"
- "Design a data lakehouse"

**Data Modeling:**
- "Create a dimensional model..."
- "Star schema vs snowflake"
- "Implement slowly changing dimensions"
- "Design a data vault"

**Data Quality:**
- "Add data validation to..."
- "Set up data quality checks"
- "Monitor data freshness"
- "Implement data contracts"

**Performance:**
- "Optimize this Spark job"
- "Query is running slow"
- "Reduce pipeline execution time"
- "Tune Airflow DAG"

---

## Quick Start

### Core Tools

```bash
# Generate pipeline orchestration config
python scripts/pipeline_orchestrator.py generate \
  --type airflow \
  --source postgres \
  --destination snowflake \
  --schedule "0 5 * * *"

# Validate data quality
python scripts/data_quality_validator.py validate \
  --input data/sales.parquet \
  --schema schemas/sales.json \
  --checks freshness,completeness,uniqueness

# Optimize ETL performance
python scripts/etl_performance_optimizer.py analyze \
  --query queries/daily_aggregation.sql \
  --engine spark \
  --recommend
```

---

## Workflows
→ See references/workflows.md for details

## Architecture Decision Framework

Use this framework to choose the right approach for your data pipeline.

### Batch vs Streaming

| Criteria | Batch | Streaming |
|----------|-------|-----------|
| **Latency requirement** | Hours to days | Seconds to minutes |
| **Data volume** | Large historical datasets | Continuous event streams |
| **Processing complexity** | Complex transformations, ML | Simple aggregations, filtering |
| **Cost sensitivity** | More cost-effective | Higher infrastructure cost |
| **Error handling** | Easier to reprocess | Requires careful design |

**Decision Tree:**
```
Is real-time insight required?
├── Yes → Use streaming
│   └── Is exactly-once semantics needed?
│       ├── Yes → Kafka + Flink/Spark Structured Streaming
│       └── No → Kafka + consumer groups
└── No → Use batch
    └── Is data volume > 1TB daily?
        ├── Yes → Spark/Databricks
        └── No → dbt + warehouse compute
```

### Lambda vs Kappa Architecture

| Aspect | Lambda | Kappa |
|--------|--------|-------|
| **Complexity** | Two codebases (batch + stream) | Single codebase |
| **Maintenance** | Higher (sync batch/stream logic) | Lower |
| **Reprocessing** | Native batch layer | Replay from source |
| **Use case** | ML training + real-time serving | Pure event-driven |

**When to choose Lambda:**
- Need to train ML models on historical data
- Complex batch transformations not feasible in streaming
- Existing batch infrastructure

**When to choose Kappa:**
- Event-sourced architecture
- All processing can be expressed as stream operations
- Starting fresh without legacy systems

### Data Warehouse vs Data Lakehouse

| Feature | Warehouse (Snowflake/BigQuery) | Lakehouse (Delta/Iceberg) |
|---------|-------------------------------|---------------------------|
| **Best for** | BI, SQL analytics | ML, unstructured data |
| **Storage cost** | Higher (proprietary format) | Lower (open formats) |
| **Flexibility** | Schema-on-write | Schema-on-read |
| **Performance** | Excellent for SQL | Good, improving |
| **Ecosystem** | Mature BI tools | Growing ML tooling |

---

## Tech Stack

| Category | Technologies |
|----------|--------------|
| **Languages** | Python, SQL, Scala |
| **Orchestration** | Airflow, Prefect, Dagster |
| **Transformation** | dbt, Spark, Flink |
| **Streaming** | Kafka, Kinesis, Pub/Sub |
| **Storage** | S3, GCS, Delta Lake, Iceberg |
| **Warehouses** | Snowflake, BigQuery, Redshift, Databricks |
| **Quality** | Great Expectations, dbt tests, Monte Carlo |
| **Monitoring** | Prometheus, Grafana, Datadog |

---

## Reference Documentation

### 1. Data Pipeline Architecture
See `references/data_pipeline_architecture.md` for:
- Lambda vs Kappa architecture patterns
- Batch processing with Spark and Airflow
- Stream processing with Kafka and Flink
- Exactly-once semantics implementation
- Error handling and dead letter queues

### 2. Data Modeling Patterns
See `references/data_modeling_patterns.md` for:
- Dimensional modeling (Star/Snowflake)
- Slowly Changing Dimensions (SCD Types 1-6)
- Data Vault modeling
- dbt best practices
- Partitioning and clustering

### 3. DataOps Best Practices
See `references/dataops_best_practices.md` for:
- Data testing frameworks
- Data contracts and schema validation
- CI/CD for data pipelines
- Observability and lineage
- Incident response

---

## Troubleshooting
→ See references/troubleshooting.md for details



---

## Referenced Files

> The following files are referenced in this skill and included for context.

### scripts/pipeline_orchestrator.py

```python
#!/usr/bin/env python3
"""
Pipeline Orchestrator

Generate pipeline configurations for Airflow, Prefect, and Dagster.
Supports ETL pattern generation, dependency management, and scheduling.

Usage:
    python pipeline_orchestrator.py generate --type airflow --source postgres --destination snowflake
    python pipeline_orchestrator.py generate --type prefect --config pipeline.yaml
    python pipeline_orchestrator.py visualize --dag dags/my_dag.py
    python pipeline_orchestrator.py validate --dag dags/my_dag.py
"""

import os
import sys
import json
import yaml
import logging
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from dataclasses import dataclass, field, asdict
from abc import ABC, abstractmethod

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


# ============================================================================
# Data Classes
# ============================================================================

@dataclass
class SourceConfig:
    """Source system configuration."""
    type: str  # postgres, mysql, s3, kafka, api
    connection_id: str
    schema: Optional[str] = None
    tables: List[str] = field(default_factory=list)
    query: Optional[str] = None
    incremental_column: Optional[str] = None
    incremental_strategy: str = "timestamp"  # timestamp, id, cdc

@dataclass
class DestinationConfig:
    """Destination system configuration."""
    type: str  # snowflake, bigquery, redshift, s3, delta
    connection_id: str
    schema: str = "raw"
    write_mode: str = "append"  # append, overwrite, merge
    partition_by: Optional[str] = None
    cluster_by: List[str] = field(default_factory=list)

@dataclass
class TaskConfig:
    """Individual task configuration."""
    task_id: str
    operator: str
    dependencies: List[str] = field(default_factory=list)
    params: Dict[str, Any] = field(default_factory=dict)
    retries: int = 2
    retry_delay_minutes: int = 5
    timeout_minutes: int = 60
    pool: Optional[str] = None
    priority_weight: int = 1

@dataclass
class PipelineConfig:
    """Complete pipeline configuration."""
    name: str
    description: str
    schedule: str  # cron expression or @daily, @hourly
    owner: str = "data-team"
    tags: List[str] = field(default_factory=list)
    catchup: bool = False
    max_active_runs: int = 1
    default_retries: int = 2
    source: Optional[SourceConfig] = None
    destination: Optional[DestinationConfig] = None
    tasks: List[TaskConfig] = field(default_factory=list)


# ============================================================================
# Pipeline Generators
# ============================================================================

class PipelineGenerator(ABC):
    """Abstract base class for pipeline generators."""

    @abstractmethod
    def generate(self, config: PipelineConfig) -> str:
        """Generate pipeline code from config."""
        pass

    @abstractmethod
    def validate(self, code: str) -> Dict[str, Any]:
        """Validate generated pipeline code."""
        pass


class AirflowGenerator(PipelineGenerator):
    """Generate Airflow DAG code."""

    OPERATOR_IMPORTS = {
        'python': 'from airflow.operators.python import PythonOperator',
        'bash': 'from airflow.operators.bash import BashOperator',
        'postgres': 'from airflow.providers.postgres.operators.postgres import PostgresOperator',
        'snowflake': 'from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator',
        's3': 'from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator',
        's3_to_snowflake': 'from airflow.providers.snowflake.transfers.s3_to_snowflake import S3ToSnowflakeOperator',
        'sensor': 'from airflow.sensors.base import BaseSensorOperator',
        'trigger': 'from airflow.operators.trigger_dagrun import TriggerDagRunOperator',
        'email': 'from airflow.operators.email import EmailOperator',
        'slack': 'from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator',
    }

    def generate(self, config: PipelineConfig) -> str:
        """Generate Airflow DAG from configuration."""

        # Collect required imports
        imports = self._collect_imports(config)

        # Generate DAG code
        code = self._generate_header(imports)
        code += self._generate_default_args(config)
        code += self._generate_dag_definition(config)
        code += self._generate_tasks(config)
        code += self._generate_dependencies(config)

        return code

    def _collect_imports(self, config: PipelineConfig) -> List[str]:
        """Collect required import statements."""
        imports = [
            "from airflow import DAG",
            "from airflow.utils.dates import days_ago",
            "from datetime import datetime, timedelta",
        ]

        operators_used = set()
        for task in config.tasks:
            op_type = task.operator.split('_')[0].lower()
            if op_type in self.OPERATOR_IMPORTS:
                operators_used.add(op_type)

        # Add source/destination specific imports
        if config.source:
            if config.source.type == 'postgres':
                operators_used.add('postgres')
            elif config.source.type == 's3':
                operators_used.add('s3')

        if config.destination:
            if config.destination.type == 'snowflake':
                operators_used.add('snowflake')
                operators_used.add('s3_to_snowflake')

        for op in operators_used:
            if op in self.OPERATOR_IMPORTS:
                imports.append(self.OPERATOR_IMPORTS[op])

        return imports

    def _generate_header(self, imports: List[str]) -> str:
        """Generate file header with imports."""
        header = '''"""
Auto-generated Airflow DAG
Generated by Pipeline Orchestrator
"""

'''
        header += '\n'.join(imports)
        header += '\n\n'
        return header

    def _generate_default_args(self, config: PipelineConfig) -> str:
        """Generate default_args dictionary."""
        return f'''
default_args = {{
    'owner': '{config.owner}',
    'depends_on_past': False,
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': {config.default_retries},
    'retry_delay': timedelta(minutes=5),
}}

'''

    def _generate_dag_definition(self, config: PipelineConfig) -> str:
        """Generate DAG definition."""
        tags_str = str(config.tags) if config.tags else "[]"

        return f'''
with DAG(
    dag_id='{config.name}',
    default_args=default_args,
    description='{config.description}',
    schedule_interval='{config.schedule}',
    start_date=days_ago(1),
    catchup={config.catchup},
    max_active_runs={config.max_active_runs},
    tags={tags_str},
) as dag:

'''

    def _generate_tasks(self, config: PipelineConfig) -> str:
        """Generate task definitions."""
        tasks_code = ""

        for task in config.tasks:
            if 'python' in task.operator.lower():
                tasks_code += self._generate_python_task(task)
            elif 'bash' in task.operator.lower():
                tasks_code += self._generate_bash_task(task)
            elif 'sql' in task.operator.lower() or 'postgres' in task.operator.lower():
                tasks_code += self._generate_sql_task(task, config)
            elif 'snowflake' in task.operator.lower():
                tasks_code += self._generate_snowflake_task(task)
            else:
                tasks_code += self._generate_generic_task(task)

        return tasks_code

    def _generate_python_task(self, task: TaskConfig) -> str:
        """Generate PythonOperator task."""
        callable_name = task.params.get('callable', 'process_data')
        return f'''
    def {callable_name}(**kwargs):
        """Task: {task.task_id}"""
        # Add your processing logic here
        execution_date = kwargs.get('ds')
        print(f"Processing data for {{execution_date}}")
        return True

    {task.task_id} = PythonOperator(
        task_id='{task.task_id}',
        python_callable={callable_name},
        retries={task.retries},
        retry_delay=timedelta(minutes={task.retry_delay_minutes}),
        execution_timeout=timedelta(minutes={task.timeout_minutes}),
    )

'''

    def _generate_bash_task(self, task: TaskConfig) -> str:
        """Generate BashOperator task."""
        command = task.params.get('command', 'echo "Hello World"')
        return f'''
    {task.task_id} = BashOperator(
        task_id='{task.task_id}',
        bash_command='{command}',
        retries={task.retries},
        retry_delay=timedelta(minutes={task.retry_delay_minutes}),
        execution_timeout=timedelta(minutes={task.timeout_minutes}),
    )

'''

    def _generate_sql_task(self, task: TaskConfig, config: PipelineConfig) -> str:
        """Generate SQL operator task."""
        sql = task.params.get('sql', 'SELECT 1')
        conn_id = config.source.connection_id if config.source else 'default_conn'

        return f'''
    {task.task_id} = PostgresOperator(
        task_id='{task.task_id}',
        postgres_conn_id='{conn_id}',
        sql="""{sql}""",
        retries={task.retries},
        retry_delay=timedelta(minutes={task.retry_delay_minutes}),
    )

'''

    def _generate_snowflake_task(self, task: TaskConfig) -> str:
        """Generate SnowflakeOperator task."""
        sql = task.params.get('sql', 'SELECT 1')
        return f'''
    {task.task_id} = SnowflakeOperator(
        task_id='{task.task_id}',
        snowflake_conn_id='snowflake_default',
        sql="""{sql}""",
        retries={task.retries},
        retry_delay=timedelta(minutes={task.retry_delay_minutes}),
    )

'''

    def _generate_generic_task(self, task: TaskConfig) -> str:
        """Generate generic task placeholder."""
        return f'''
    # TODO: Implement {task.operator} for {task.task_id}
    {task.task_id} = PythonOperator(
        task_id='{task.task_id}',
        python_callable=lambda: print("{task.task_id}"),
    )

'''

    def _generate_dependencies(self, config: PipelineConfig) -> str:
        """Generate task dependencies."""
        deps_code = "\n    # Task dependencies\n"

        for task in config.tasks:
            if task.dependencies:
                for dep in task.dependencies:
                    deps_code += f"    {dep} >> {task.task_id}\n"

        return deps_code

    def validate(self, code: str) -> Dict[str, Any]:
        """Validate generated DAG code."""
        issues = []
        warnings = []

        # Check for common issues
        if 'default_args' not in code:
            issues.append("Missing default_args definition")

        if 'with DAG' not in code:
            issues.append("Missing DAG context manager")

        if 'schedule_interval' not in code:
            warnings.append("No schedule_interval defined, DAG won't run automatically")

        # Try to parse the code
        try:
            compile(code, '<string>', 'exec')
        except SyntaxError as e:
            issues.append(f"Syntax error: {e}")

        return {
            'valid': len(issues) == 0,
            'issues': issues,
            'warnings': warnings
        }


class PrefectGenerator(PipelineGenerator):
    """Generate Prefect flow code."""

    def generate(self, config: PipelineConfig) -> str:
        """Generate Prefect flow from configuration."""

        code = self._generate_header()
        code += self._generate_tasks(config)
        code += self._generate_flow(config)

        return code

    def _generate_header(self) -> str:
        """Generate file header."""
        return '''"""
Auto-generated Prefect Flow
Generated by Pipeline Orchestrator
"""

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd

'''

    def _generate_tasks(self, config: PipelineConfig) -> str:
        """Generate Prefect tasks."""
        tasks_code = ""

        for task_config in config.tasks:
            cache_expiration = task_config.params.get('cache_hours', 1)
            tasks_code += f'''
@task(
    name="{task_config.task_id}",
    retries={task_config.retries},
    retry_delay_seconds={task_config.retry_delay_minutes * 60},
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours={cache_expiration}),
)
def {task_config.task_id}(input_data=None):
    """Task: {task_config.task_id}"""
    logger = get_run_logger()
    logger.info(f"Executing {task_config.task_id}")

    # Add processing logic here
    result = input_data

    return result

'''
        return tasks_code

    def _generate_flow(self, config: PipelineConfig) -> str:
        """Generate Prefect flow."""
        flow_code = f'''
@flow(
    name="{config.name}",
    description="{config.description}",
    version="1.0.0",
)
def {config.name.replace('-', '_')}_flow():
    """Main flow orchestrating all tasks."""
    logger = get_run_logger()
    logger.info("Starting flow: {config.name}")

'''
        # Generate task calls with dependencies
        task_vars = {}
        for i, task_config in enumerate(config.tasks):
            task_name = task_config.task_id
            var_name = f"result_{i}"
            task_vars[task_name] = var_name

            if task_config.dependencies:
                # Get input from first dependency
                dep_var = task_vars.get(task_config.dependencies[0], "None")
                flow_code += f"    {var_name} = {task_name}({dep_var})\n"
            else:
                flow_code += f"    {var_name} = {task_name}()\n"

        flow_code += '''
    logger.info("Flow completed successfully")
    return True


if __name__ == "__main__":
    ''' + f'{config.name.replace("-", "_")}_flow()' + '\n'

        return flow_code

    def validate(self, code: str) -> Dict[str, Any]:
        """Validate Prefect flow code."""
        issues = []

        if '@flow' not in code:
            issues.append("Missing @flow decorator")

        if '@task' not in code:
            issues.append("No tasks defined with @task decorator")

        try:
            compile(code, '<string>', 'exec')
        except SyntaxError as e:
            issues.append(f"Syntax error: {e}")

        return {
            'valid': len(issues) == 0,
            'issues': issues,
            'warnings': []
        }


class DagsterGenerator(PipelineGenerator):
    """Generate Dagster job code."""

    def generate(self, config: PipelineConfig) -> str:
        """Generate Dagster job from configuration."""

        code = self._generate_header()
        code += self._generate_ops(config)
        code += self._generate_job(config)

        return code

    def _generate_header(self) -> str:
        """Generate file header."""
        return '''"""
Auto-generated Dagster Job
Generated by Pipeline Orchestrator
"""

from dagster import op, job, In, Out, Output, DynamicOut, graph
from dagster import AssetMaterialization, MetadataValue
import pandas as pd

'''

    def _generate_ops(self, config: PipelineConfig) -> str:
        """Generate Dagster ops."""
        ops_code = ""

        for task_config in config.tasks:
            has_input = len(task_config.dependencies) > 0

            if has_input:
                ops_code += f'''
@op(
    ins={{"input_data": In()}},
    out=Out(),
)
def {task_config.task_id}(context, input_data):
    """Op: {task_config.task_id}"""
    context.log.info(f"Executing {task_config.task_id}")

    # Add processing logic here
    result = input_data

    # Log asset materialization
    yield AssetMaterialization(
        asset_key="{task_config.task_id}",
        metadata={{
            "row_count": MetadataValue.int(len(result) if hasattr(result, '__len__') else 0),
        }}
    )
    yield Output(result)

'''
            else:
                ops_code += f'''
@op(out=Out())
def {task_config.task_id}(context):
    """Op: {task_config.task_id}"""
    context.log.info(f"Executing {task_config.task_id}")

    # Add processing logic here
    result = {{}}

    yield AssetMaterialization(
        asset_key="{task_config.task_id}",
    )
    yield Output(result)

'''
        return ops_code

    def _generate_job(self, config: PipelineConfig) -> str:
        """Generate Dagster job."""
        job_code = f'''
@job(
    name="{config.name}",
    description="{config.description}",
    tags={{
        "owner": "{config.owner}",
        "schedule": "{config.schedule}",
    }},
)
def {config.name.replace('-', '_')}_job():
    """Main job orchestrating all ops."""
'''
        # Build dependency graph
        task_outputs = {}
        for task_config in config.tasks:
            task_name = task_config.task_id

            if task_config.dependencies:
                dep_output = task_outputs.get(task_config.dependencies[0], None)
                if dep_output:
                    job_code += f"    {task_name}_output = {task_name}({dep_output})\n"
                else:
                    job_code += f"    {task_name}_output = {task_name}()\n"
            else:
                job_code += f"    {task_name}_output = {task_name}()\n"

            task_outputs[task_name] = f"{task_name}_output"

        return job_code

    def validate(self, code: str) -> Dict[str, Any]:
        """Validate Dagster job code."""
        issues = []

        if '@job' not in code:
            issues.append("Missing @job decorator")

        if '@op' not in code:
            issues.append("No ops defined with @op decorator")

        try:
            compile(code, '<string>', 'exec')
        except SyntaxError as e:
            issues.append(f"Syntax error: {e}")

        return {
            'valid': len(issues) == 0,
            'issues': issues,
            'warnings': []
        }


# ============================================================================
# ETL Pattern Templates
# ============================================================================

class ETLPatternGenerator:
    """Generate common ETL patterns."""

    @staticmethod
    def generate_extract_load(
        source_type: str,
        destination_type: str,
        tables: List[str],
        mode: str = "incremental"
    ) -> PipelineConfig:
        """Generate extract-load pipeline configuration."""

        tasks = []

        # Extract tasks
        for table in tables:
            extract_task = TaskConfig(
                task_id=f"extract_{table}",
                operator="python_operator",
                params={
                    'callable': f'extract_{table}',
                    'sql': f'SELECT * FROM {table}' + (
                        ' WHERE updated_at > {{{{ prev_ds }}}}' if mode == 'incremental' else ''
                    )
                }
            )
            tasks.append(extract_task)

        # Load tasks with dependencies
        for table in tables:
            load_task = TaskConfig(
                task_id=f"load_{table}",
                operator="python_operator",
                dependencies=[f"extract_{table}"],
                params={'callable': f'load_{table}'}
            )
            tasks.append(load_task)

        # Quality check task
        quality_task = TaskConfig(
            task_id="quality_check",
            operator="python_operator",
            dependencies=[f"load_{table}" for table in tables],
            params={'callable': 'run_quality_checks'}
        )
        tasks.append(quality_task)

        return PipelineConfig(
            name=f"el_{source_type}_to_{destination_type}",
            description=f"Extract from {source_type}, load to {destination_type}",
            schedule="0 5 * * *",  # Daily at 5 AM
            tags=["etl", source_type, destination_type],
            source=SourceConfig(
                type=source_type,
                connection_id=f"{source_type}_default",
                tables=tables,
                incremental_strategy="timestamp" if mode == "incremental" else "full"
            ),
            destination=DestinationConfig(
                type=destination_type,
                connection_id=f"{destination_type}_default",
                write_mode="append" if mode == "incremental" else "overwrite"
            ),
            tasks=tasks
        )

    @staticmethod
    def generate_transform_pipeline(
        source_tables: List[str],
        target_table: str,
        dbt_models: List[str]
    ) -> PipelineConfig:
        """Generate transformation pipeline with dbt."""

        tasks = []

        # Sensor for source freshness
        for table in source_tables:
            sensor_task = TaskConfig(
                task_id=f"wait_for_{table}",
                operator="sql_sensor",
                params={
                    'sql': f"SELECT MAX(updated_at) FROM {table} WHERE updated_at > '{{{{ ds }}}}'"
                }
            )
            tasks.append(sensor_task)

        # dbt run task
        dbt_run = TaskConfig(
            task_id="dbt_run",
            operator="bash_operator",
            dependencies=[f"wait_for_{t}" for t in source_tables],
            params={
                'command': f'cd /opt/dbt && dbt run --select {" ".join(dbt_models)}'
            },
            timeout_minutes=120
        )
        tasks.append(dbt_run)

        # dbt test task
        dbt_test = TaskConfig(
            task_id="dbt_test",
            operator="bash_operator",
            dependencies=["dbt_run"],
            params={
                'command': f'cd /opt/dbt && dbt test --select {" ".join(dbt_models)}'
            }
        )
        tasks.append(dbt_test)

        return PipelineConfig(
            name=f"transform_{target_table}",
            description=f"Transform data into {target_table} using dbt",
            schedule="0 6 * * *",  # Daily at 6 AM (after extraction)
            tags=["transform", "dbt"],
            tasks=tasks
        )


# ============================================================================
# CLI Interface
# ============================================================================

def main():
    parser = argparse.ArgumentParser(
        description="Pipeline Orchestrator - Generate and manage data pipeline configurations",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  Generate Airflow DAG:
    python pipeline_orchestrator.py generate --type airflow --source postgres --destination snowflake --tables orders,customers

  Generate from config file:
    python pipeline_orchestrator.py generate --config pipeline.yaml --type prefect

  Validate existing DAG:
    python pipeline_orchestrator.py validate --dag dags/my_dag.py --type airflow
        """
    )

    subparsers = parser.add_subparsers(dest='command', help='Command to run')

    # Generate command
    gen_parser = subparsers.add_parser('generate', help='Generate pipeline code')
    gen_parser.add_argument('--type', '-t', required=True,
                           choices=['airflow', 'prefect', 'dagster'],
                           help='Pipeline framework type')
    gen_parser.add_argument('--source', '-s', help='Source system type')
    gen_parser.add_argument('--destination', '-d', help='Destination system type')
    gen_parser.add_argument('--tables', help='Comma-separated list of tables')
    gen_parser.add_argument('--config', '-c', help='Configuration YAML file')
    gen_parser.add_argument('--output', '-o', help='Output file path')
    gen_parser.add_argument('--name', '-n', help='Pipeline name')
    gen_parser.add_argument('--schedule', default='0 5 * * *', help='Cron schedule')
    gen_parser.add_argument('--mode', default='incremental',
                           choices=['incremental', 'full'],
                           help='Load mode')

    # Validate command
    val_parser = subparsers.add_parser('validate', help='Validate pipeline code')
    val_parser.add_argument('--dag', required=True, help='DAG file to validate')
    val_parser.add_argument('--type', '-t', required=True,
                           choices=['airflow', 'prefect', 'dagster'])

    # Template command
    tmpl_parser = subparsers.add_parser('template', help='Generate from template')
    tmpl_parser.add_argument('--pattern', '-p', required=True,
                            choices=['extract-load', 'transform', 'cdc'],
                            help='ETL pattern to generate')
    tmpl_parser.add_argument('--type', '-t', required=True,
                            choices=['airflow', 'prefect', 'dagster'])
    tmpl_parser.add_argument('--source', '-s', required=True)
    tmpl_parser.add_argument('--destination', '-d', required=True)
    tmpl_parser.add_argument('--tables', required=True)
    tmpl_parser.add_argument('--output', '-o', help='Output file path')

    args = parser.parse_args()

    if args.command is None:
        parser.print_help()
        sys.exit(1)

    try:
        if args.command == 'generate':
            # Load config if provided
            if args.config:
                with open(args.config) as f:
                    config_data = yaml.safe_load(f)
                config = PipelineConfig(**config_data)
            else:
                # Build config from arguments
                tables = args.tables.split(',') if args.tables else []

                config = ETLPatternGenerator.generate_extract_load(
                    source_type=args.source or 'postgres',
                    destination_type=args.destination or 'snowflake',
                    tables=tables,
                    mode=args.mode
                )

                if args.name:
                    config.name = args.name
                config.schedule = args.schedule

            # Generate code
            generators = {
                'airflow': AirflowGenerator(),
                'prefect': PrefectGenerator(),
                'dagster': DagsterGenerator()
            }

            generator = generators[args.type]
            code = generator.generate(config)

            # Validate
            validation = generator.validate(code)
            if not validation['valid']:
                logger.warning(f"Validation issues: {validation['issues']}")

            # Output
            if args.output:
                with open(args.output, 'w') as f:
                    f.write(code)
                logger.info(f"Generated pipeline saved to {args.output}")
            else:
                print(code)

        elif args.command == 'validate':
            with open(args.dag) as f:
                code = f.read()

            generators = {
                'airflow': AirflowGenerator(),
                'prefect': PrefectGenerator(),
                'dagster': DagsterGenerator()
            }

            generator = generators[args.type]
            result = generator.validate(code)

            print(json.dumps(result, indent=2))
            sys.exit(0 if result['valid'] else 1)

        elif args.command == 'template':
            tables = args.tables.split(',')

            if args.pattern == 'extract-load':
                config = ETLPatternGenerator.generate_extract_load(
                    source_type=args.source,
                    destination_type=args.destination,
                    tables=tables
                )
            elif args.pattern == 'transform':
                config = ETLPatternGenerator.generate_transform_pipeline(
                    source_tables=tables,
                    target_table='fct_output',
                    dbt_models=['stg_*', 'fct_*']
                )
            else:
                logger.error(f"Pattern {args.pattern} not yet implemented")
                sys.exit(1)

            generators = {
                'airflow': AirflowGenerator(),
                'prefect': PrefectGenerator(),
                'dagster': DagsterGenerator()
            }

            generator = generators[args.type]
            code = generator.generate(config)

            if args.output:
                with open(args.output, 'w') as f:
                    f.write(code)
                logger.info(f"Generated {args.pattern} pipeline saved to {args.output}")
            else:
                print(code)

        sys.exit(0)

    except Exception as e:
        logger.error(f"Error: {e}")
        sys.exit(1)


if __name__ == '__main__':
    main()

```

### scripts/data_quality_validator.py

```python
#!/usr/bin/env python3
"""
Data Quality Validator
Comprehensive data quality validation tool for data engineering workflows.

Features:
- Schema validation (types, nullability, constraints)
- Data profiling (statistics, distributions, patterns)
- Great Expectations suite generation
- Data contract validation
- Anomaly detection
- Quality scoring and reporting

Usage:
    python data_quality_validator.py validate data.csv --schema schema.json
    python data_quality_validator.py profile data.csv --output profile.json
    python data_quality_validator.py generate-suite data.csv --output expectations.json
    python data_quality_validator.py contract data.csv --contract contract.yaml
"""

import os
import sys
import json
import csv
import re
import argparse
import logging
import statistics
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple, Set
from dataclasses import dataclass, field, asdict
from datetime import datetime
from collections import Counter
from abc import ABC, abstractmethod

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


# =============================================================================
# Data Classes
# =============================================================================

@dataclass
class ColumnSchema:
    """Schema definition for a column"""
    name: str
    data_type: str  # string, integer, float, boolean, date, datetime, email, uuid
    nullable: bool = True
    unique: bool = False
    min_value: Optional[float] = None
    max_value: Optional[float] = None
    min_length: Optional[int] = None
    max_length: Optional[int] = None
    pattern: Optional[str] = None  # regex pattern
    allowed_values: Optional[List[str]] = None
    description: str = ""


@dataclass
class DataSchema:
    """Complete schema for a dataset"""
    name: str
    version: str
    columns: List[ColumnSchema]
    primary_key: Optional[List[str]] = None
    row_count_min: Optional[int] = None
    row_count_max: Optional[int] = None


@dataclass
class ValidationResult:
    """Result of a single validation check"""
    check_name: str
    column: Optional[str]
    passed: bool
    expected: Any
    actual: Any
    severity: str = "error"  # error, warning, info
    message: str = ""
    failed_rows: List[int] = field(default_factory=list)


@dataclass
class ColumnProfile:
    """Statistical profile of a column"""
    name: str
    data_type: str
    total_count: int
    null_count: int
    null_percentage: float
    unique_count: int
    unique_percentage: float
    # Numeric stats
    min_value: Optional[float] = None
    max_value: Optional[float] = None
    mean: Optional[float] = None
    median: Optional[float] = None
    std_dev: Optional[float] = None
    percentile_25: Optional[float] = None
    percentile_75: Optional[float] = None
    # String stats
    min_length: Optional[int] = None
    max_length: Optional[int] = None
    avg_length: Optional[float] = None
    # Pattern detection
    detected_pattern: Optional[str] = None
    top_values: List[Tuple[str, int]] = field(default_factory=list)


@dataclass
class DataProfile:
    """Complete profile of a dataset"""
    name: str
    row_count: int
    column_count: int
    columns: List[ColumnProfile]
    duplicate_rows: int
    memory_size_bytes: int
    profile_timestamp: str


@dataclass
class QualityScore:
    """Overall quality score for a dataset"""
    completeness: float  # % of non-null values
    uniqueness: float    # % of unique values where expected
    validity: float      # % passing validation rules
    consistency: float   # % passing cross-column checks
    accuracy: float      # % matching expected patterns
    overall: float       # weighted average


# =============================================================================
# Type Detection
# =============================================================================

class TypeDetector:
    """Detect and infer data types from values"""

    PATTERNS = {
        'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
        'uuid': r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$',
        'phone': r'^\+?[\d\s\-\(\)]{10,}$',
        'url': r'^https?://[^\s]+$',
        'ipv4': r'^(\d{1,3}\.){3}\d{1,3}$',
        'date_iso': r'^\d{4}-\d{2}-\d{2}$',
        'datetime_iso': r'^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}',
        'credit_card': r'^\d{4}[\s\-]?\d{4}[\s\-]?\d{4}[\s\-]?\d{4}$',
    }

    @classmethod
    def detect_type(cls, values: List[str]) -> str:
        """Detect the most likely data type from a sample of values"""
        non_empty = [v for v in values if v and v.strip()]
        if not non_empty:
            return "string"

        # Check for patterns first
        for pattern_name, pattern in cls.PATTERNS.items():
            regex = re.compile(pattern, re.IGNORECASE)
            matches = sum(1 for v in non_empty if regex.match(v.strip()))
            if matches / len(non_empty) > 0.9:
                return pattern_name

        # Check for numeric types
        int_count = 0
        float_count = 0
        bool_count = 0

        for v in non_empty:
            v = v.strip()
            if v.lower() in ('true', 'false', 'yes', 'no', '1', '0'):
                bool_count += 1
            try:
                int(v)
                int_count += 1
            except ValueError:
                try:
                    float(v)
                    float_count += 1
                except ValueError:
                    pass

        if bool_count / len(non_empty) > 0.9:
            return "boolean"
        if int_count / len(non_empty) > 0.9:
            return "integer"
        if (int_count + float_count) / len(non_empty) > 0.9:
            return "float"

        return "string"

    @classmethod
    def detect_pattern(cls, values: List[str]) -> Optional[str]:
        """Try to detect a common pattern in string values"""
        non_empty = [v for v in values if v and v.strip()]
        if not non_empty or len(non_empty) < 10:
            return None

        for pattern_name, pattern in cls.PATTERNS.items():
            regex = re.compile(pattern, re.IGNORECASE)
            matches = sum(1 for v in non_empty if regex.match(v.strip()))
            if matches / len(non_empty) > 0.8:
                return pattern_name

        return None


# =============================================================================
# Validators
# =============================================================================

class BaseValidator(ABC):
    """Base class for validators"""

    @abstractmethod
    def validate(self, data: List[Dict], schema: Optional[DataSchema] = None) -> List[ValidationResult]:
        pass


class SchemaValidator(BaseValidator):
    """Validate data against a schema"""

    def validate(self, data: List[Dict], schema: DataSchema) -> List[ValidationResult]:
        results = []

        if not data:
            results.append(ValidationResult(
                check_name="data_not_empty",
                column=None,
                passed=False,
                expected="non-empty dataset",
                actual="empty dataset",
                severity="error",
                message="Dataset is empty"
            ))
            return results

        # Validate row count
        row_count = len(data)
        if schema.row_count_min and row_count < schema.row_count_min:
            results.append(ValidationResult(
                check_name="row_count_min",
                column=None,
                passed=False,
                expected=f">= {schema.row_count_min}",
                actual=row_count,
                severity="error",
                message=f"Row count {row_count} is below minimum {schema.row_count_min}"
            ))

        if schema.row_count_max and row_count > schema.row_count_max:
            results.append(ValidationResult(
                check_name="row_count_max",
                column=None,
                passed=False,
                expected=f"<= {schema.row_count_max}",
                actual=row_count,
                severity="warning",
                message=f"Row count {row_count} exceeds maximum {schema.row_count_max}"
            ))

        # Validate each column
        for col_schema in schema.columns:
            col_results = self._validate_column(data, col_schema)
            results.extend(col_results)

        # Validate primary key uniqueness
        if schema.primary_key:
            pk_results = self._validate_primary_key(data, schema.primary_key)
            results.extend(pk_results)

        return results

    def _validate_column(self, data: List[Dict], col_schema: ColumnSchema) -> List[ValidationResult]:
        results = []
        col_name = col_schema.name

        # Check column exists
        if data and col_name not in data[0]:
            results.append(ValidationResult(
                check_name="column_exists",
                column=col_name,
                passed=False,
                expected="column present",
                actual="column missing",
                severity="error",
                message=f"Column '{col_name}' not found in data"
            ))
            return results

        values = [row.get(col_name) for row in data]
        failed_rows = []

        # Null check
        null_count = sum(1 for v in values if v is None or v == '')
        if not col_schema.nullable and null_count > 0:
            failed_rows = [i for i, v in enumerate(values) if v is None or v == '']
            results.append(ValidationResult(
                check_name="not_null",
                column=col_name,
                passed=False,
                expected="no nulls",
                actual=f"{null_count} nulls",
                severity="error",
                message=f"Column '{col_name}' has {null_count} null values but is not nullable",
                failed_rows=failed_rows[:100]  # Limit to first 100
            ))

        non_null_values = [v for v in values if v is not None and v != '']

        # Uniqueness check
        if col_schema.unique and non_null_values:
            unique_count = len(set(non_null_values))
            if unique_count != len(non_null_values):
                duplicate_values = [v for v, count in Counter(non_null_values).items() if count > 1]
                results.append(ValidationResult(
                    check_name="unique",
                    column=col_name,
                    passed=False,
                    expected="all unique",
                    actual=f"{len(non_null_values) - unique_count} duplicates",
                    severity="error",
                    message=f"Column '{col_name}' has duplicate values: {duplicate_values[:5]}"
                ))

        # Type validation
        type_failures = self._validate_type(non_null_values, col_schema.data_type)
        if type_failures:
            results.append(ValidationResult(
                check_name="data_type",
                column=col_name,
                passed=False,
                expected=col_schema.data_type,
                actual=f"{len(type_failures)} invalid values",
                severity="error",
                message=f"Column '{col_name}' has {len(type_failures)} values not matching type {col_schema.data_type}",
                failed_rows=type_failures[:100]
            ))

        # Range validation for numeric columns
        if col_schema.min_value is not None or col_schema.max_value is not None:
            range_failures = self._validate_range(non_null_values, col_schema)
            if range_failures:
                results.append(ValidationResult(
                    check_name="value_range",
                    column=col_name,
                    passed=False,
                    expected=f"[{col_schema.min_value}, {col_schema.max_value}]",
                    actual=f"{len(range_failures)} out of range",
                    severity="error",
                    message=f"Column '{col_name}' has values outside range",
                    failed_rows=range_failures[:100]
                ))

        # Length validation for string columns
        if col_schema.min_length is not None or col_schema.max_length is not None:
            length_failures = self._validate_length(non_null_values, col_schema)
            if length_failures:
                results.append(ValidationResult(
                    check_name="string_length",
                    column=col_name,
                    passed=False,
                    expected=f"length [{col_schema.min_length}, {col_schema.max_length}]",
                    actual=f"{len(length_failures)} out of range",
                    severity="warning",
                    message=f"Column '{col_name}' has values with invalid length",
                    failed_rows=length_failures[:100]
                ))

        # Pattern validation
        if col_schema.pattern:
            pattern_failures = self._validate_pattern(non_null_values, col_schema.pattern)
            if pattern_failures:
                results.append(ValidationResult(
                    check_name="pattern_match",
                    column=col_name,
                    passed=False,
                    expected=f"matches {col_schema.pattern}",
                    actual=f"{len(pattern_failures)} non-matching",
                    severity="error",
                    message=f"Column '{col_name}' has values not matching pattern",
                    failed_rows=pattern_failures[:100]
                ))

        # Allowed values validation
        if col_schema.allowed_values:
            allowed_set = set(col_schema.allowed_values)
            invalid = [i for i, v in enumerate(non_null_values) if str(v) not in allowed_set]
            if invalid:
                results.append(ValidationResult(
                    check_name="allowed_values",
                    column=col_name,
                    passed=False,
                    expected=f"one of {col_schema.allowed_values}",
                    actual=f"{len(invalid)} invalid values",
                    severity="error",
                    message=f"Column '{col_name}' has values not in allowed list",
                    failed_rows=invalid[:100]
                ))

        return results

    def _validate_type(self, values: List[Any], expected_type: str) -> List[int]:
        """Return indices of values that don't match expected type"""
        failures = []

        for i, v in enumerate(values):
            v_str = str(v)
            valid = False

            if expected_type == "integer":
                try:
                    int(v_str)
                    valid = True
                except ValueError:
                    pass
            elif expected_type == "float":
                try:
                    float(v_str)
                    valid = True
                except ValueError:
                    pass
            elif expected_type == "boolean":
                valid = v_str.lower() in ('true', 'false', 'yes', 'no', '1', '0')
            elif expected_type == "email":
                valid = bool(re.match(TypeDetector.PATTERNS['email'], v_str, re.IGNORECASE))
            elif expected_type == "uuid":
                valid = bool(re.match(TypeDetector.PATTERNS['uuid'], v_str, re.IGNORECASE))
            elif expected_type in ("date", "date_iso"):
                valid = bool(re.match(TypeDetector.PATTERNS['date_iso'], v_str))
            elif expected_type in ("datetime", "datetime_iso"):
                valid = bool(re.match(TypeDetector.PATTERNS['datetime_iso'], v_str))
            else:
                valid = True  # string accepts anything

            if not valid:
                failures.append(i)

        return failures

    def _validate_range(self, values: List[Any], col_schema: ColumnSchema) -> List[int]:
        """Return indices of values outside the specified range"""
        failures = []
        for i, v in enumerate(values):
            try:
                num = float(v)
                if col_schema.min_value is not None and num < col_schema.min_value:
                    failures.append(i)
                elif col_schema.max_value is not None and num > col_schema.max_value:
                    failures.append(i)
            except (ValueError, TypeError):
                pass
        return failures

    def _validate_length(self, values: List[Any], col_schema: ColumnSchema) -> List[int]:
        """Return indices of values with invalid string length"""
        failures = []
        for i, v in enumerate(values):
            length = len(str(v))
            if col_schema.min_length is not None and length < col_schema.min_length:
                failures.append(i)
            elif col_schema.max_length is not None and length > col_schema.max_length:
                failures.append(i)
        return failures

    def _validate_pattern(self, values: List[Any], pattern: str) -> List[int]:
        """Return indices of values not matching the pattern"""
        regex = re.compile(pattern)
        return [i for i, v in enumerate(values) if not regex.match(str(v))]

    def _validate_primary_key(self, data: List[Dict], pk_columns: List[str]) -> List[ValidationResult]:
        """Validate primary key uniqueness"""
        results = []
        pk_values = []

        for row in data:
            pk = tuple(row.get(col) for col in pk_columns)
            pk_values.append(pk)

        pk_counts = Counter(pk_values)
        duplicates = {pk: count for pk, count in pk_counts.items() if count > 1}

        if duplicates:
            results.append(ValidationResult(
                check_name="primary_key_unique",
                column=",".join(pk_columns),
                passed=False,
                expected="all unique",
                actual=f"{len(duplicates)} duplicate keys",
                severity="error",
                message=f"Primary key has {len(duplicates)} duplicate combinations"
            ))

        return results


class AnomalyDetector(BaseValidator):
    """Detect anomalies in data"""

    def __init__(self, z_threshold: float = 3.0, iqr_multiplier: float = 1.5):
        self.z_threshold = z_threshold
        self.iqr_multiplier = iqr_multiplier

    def validate(self, data: List[Dict], schema: Optional[DataSchema] = None) -> List[ValidationResult]:
        results = []

        if not data:
            return results

        # Get numeric columns
        numeric_columns = []
        for col in data[0].keys():
            values = [row.get(col) for row in data]
            non_null = [v for v in values if v is not None and v != '']
            try:
                [float(v) for v in non_null[:100]]
                numeric_columns.append(col)
            except (ValueError, TypeError):
                pass

        for col in numeric_columns:
            col_results = self._detect_numeric_anomalies(data, col)
            results.extend(col_results)

        return results

    def _detect_numeric_anomalies(self, data: List[Dict], column: str) -> List[ValidationResult]:
        results = []

        values = []
        for row in data:
            v = row.get(column)
            if v is not None and v != '':
                try:
                    values.append(float(v))
                except (ValueError, TypeError):
                    pass

        if len(values) < 10:
            return results

        # Z-score method
        mean = statistics.mean(values)
        std = statistics.stdev(values) if len(values) > 1 else 0

        if std > 0:
            z_outliers = []
            for i, v in enumerate(values):
                z_score = abs((v - mean) / std)
                if z_score > self.z_threshold:
                    z_outliers.append((i, v, z_score))

            if z_outliers:
                results.append(ValidationResult(
                    check_name="z_score_outlier",
                    column=column,
                    passed=len(z_outliers) == 0,
                    expected=f"z-score <= {self.z_threshold}",
                    actual=f"{len(z_outliers)} outliers",
                    severity="warning",
                    message=f"Column '{column}' has {len(z_outliers)} statistical outliers (z-score method)",
                    failed_rows=[o[0] for o in z_outliers[:100]]
                ))

        # IQR method
        sorted_values = sorted(values)
        q1_idx = len(sorted_values) // 4
        q3_idx = (3 * len(sorted_values)) // 4
        q1 = sorted_values[q1_idx]
        q3 = sorted_values[q3_idx]
        iqr = q3 - q1

        lower_bound = q1 - self.iqr_multiplier * iqr
        upper_bound = q3 + self.iqr_multiplier * iqr

        iqr_outliers = [(i, v) for i, v in enumerate(values) if v < lower_bound or v > upper_bound]

        if iqr_outliers:
            results.append(ValidationResult(
                check_name="iqr_outlier",
                column=column,
                passed=len(iqr_outliers) == 0,
                expected=f"value in [{lower_bound:.2f}, {upper_bound:.2f}]",
                actual=f"{len(iqr_outliers)} outliers",
                severity="warning",
                message=f"Column '{column}' has {len(iqr_outliers)} outliers (IQR method)",
                failed_rows=[o[0] for o in iqr_outliers[:100]]
            ))

        return results


# =============================================================================
# Data Profiler
# =============================================================================

class DataProfiler:
    """Generate statistical profiles of datasets"""

    def profile(self, data: List[Dict], name: str = "dataset") -> DataProfile:
        """Generate a complete profile of the dataset"""
        if not data:
            return DataProfile(
                name=name,
                row_count=0,
                column_count=0,
                columns=[],
                duplicate_rows=0,
                memory_size_bytes=0,
                profile_timestamp=datetime.now().isoformat()
            )

        columns = list(data[0].keys())
        column_profiles = []

        for col in columns:
            profile = self._profile_column(data, col)
            column_profiles.append(profile)

        # Count duplicates
        row_tuples = [tuple(sorted(row.items())) for row in data]
        duplicate_count = len(row_tuples) - len(set(row_tuples))

        # Estimate memory size
        memory_size = sys.getsizeof(data) + sum(
            sys.getsizeof(row) + sum(sys.getsizeof(v) for v in row.values())
            for row in data
        )

        return DataProfile(
            name=name,
            row_count=len(data),
            column_count=len(columns),
            columns=column_profiles,
            duplicate_rows=duplicate_count,
            memory_size_bytes=memory_size,
            profile_timestamp=datetime.now().isoformat()
        )

    def _profile_column(self, data: List[Dict], column: str) -> ColumnProfile:
        """Generate profile for a single column"""
        values = [row.get(column) for row in data]
        non_null = [v for v in values if v is not None and v != '']

        total_count = len(values)
        null_count = total_count - len(non_null)
        null_pct = (null_count / total_count * 100) if total_count > 0 else 0

        unique_values = set(str(v) for v in non_null)
        unique_count = len(unique_values)
        unique_pct = (unique_count / len(non_null) * 100) if non_null else 0

        # Detect type
        sample = [str(v) for v in non_null[:1000]]
        detected_type = TypeDetector.detect_type(sample)
        detected_pattern = TypeDetector.detect_pattern(sample)

        # Top values
        value_counts = Counter(str(v) for v in non_null)
        top_values = value_counts.most_common(10)

        profile = ColumnProfile(
            name=column,
            data_type=detected_type,
            total_count=total_count,
            null_count=null_count,
            null_percentage=null_pct,
            unique_count=unique_count,
            unique_percentage=unique_pct,
            detected_pattern=detected_pattern,
            top_values=top_values
        )

        # Add numeric stats if applicable
        if detected_type in ('integer', 'float'):
            numeric_values = []
            for v in non_null:
                try:
                    numeric_values.append(float(v))
                except (ValueError, TypeError):
                    pass

            if numeric_values:
                sorted_vals = sorted(numeric_values)
                profile.min_value = min(numeric_values)
                profile.max_value = max(numeric_values)
                profile.mean = statistics.mean(numeric_values)
                profile.median = statistics.median(numeric_values)
                if len(numeric_values) > 1:
                    profile.std_dev = statistics.stdev(numeric_values)
                profile.percentile_25 = sorted_vals[len(sorted_vals) // 4]
                profile.percentile_75 = sorted_vals[(3 * len(sorted_vals)) // 4]

        # Add string stats
        if detected_type == 'string':
            lengths = [len(str(v)) for v in non_null]
            if lengths:
                profile.min_length = min(lengths)
                profile.max_length = max(lengths)
                profile.avg_length = statistics.mean(lengths)

        return profile


# =============================================================================
# Great Expectations Suite Generator
# =============================================================================

class GreatExpectationsGenerator:
    """Generate Great Expectations validation suites"""

    def generate_suite(self, profile: DataProfile) -> Dict:
        """Generate a Great Expectations suite from a data profile"""
        expectations = []

        for col_profile in profile.columns:
            col_expectations = self._generate_column_expectations(col_profile)
            expectations.extend(col_expectations)

        # Table-level expectations
        expectations.append({
            "expectation_type": "expect_table_row_count_to_be_between",
            "kwargs": {
                "min_value": max(1, int(profile.row_count * 0.5)),
                "max_value": int(profile.row_count * 2)
            }
        })

        expectations.append({
            "expectation_type": "expect_table_column_count_to_equal",
            "kwargs": {
                "value": profile.column_count
            }
        })

        suite = {
            "expectation_suite_name": f"{profile.name}_suite",
            "expectations": expectations,
            "meta": {
                "generated_at": datetime.now().isoformat(),
                "generator": "data_quality_validator",
                "source_profile": profile.name
            }
        }

        return suite

    def _generate_column_expectations(self, col_profile: ColumnProfile) -> List[Dict]:
        """Generate expectations for a single column"""
        expectations = []
        col_name = col_profile.name

        # Column exists
        expectations.append({
            "expectation_type": "expect_column_to_exist",
            "kwargs": {"column": col_name}
        })

        # Null percentage
        if col_profile.null_percentage < 1:
            expectations.append({
                "expectation_type": "expect_column_values_to_not_be_null",
                "kwargs": {"column": col_name}
            })
        elif col_profile.null_percentage < 50:
            expectations.append({
                "expectation_type": "expect_column_values_to_not_be_null",
                "kwargs": {
                    "column": col_name,
                    "mostly": 1 - (col_profile.null_percentage / 100 * 1.5)
                }
            })

        # Uniqueness
        if col_profile.unique_percentage > 99:
            expectations.append({
                "expectation_type": "expect_column_values_to_be_unique",
                "kwargs": {"column": col_name}
            })

        # Type-specific expectations
        if col_profile.data_type == 'integer':
            expectations.append({
                "expectation_type": "expect_column_values_to_be_in_type_list",
                "kwargs": {
                    "column": col_name,
                    "type_list": ["int", "int64", "INTEGER", "BIGINT"]
                }
            })
            if col_profile.min_value is not None:
                expectations.append({
                    "expectation_type": "expect_column_values_to_be_between",
                    "kwargs": {
                        "column": col_name,
                        "min_value": col_profile.min_value,
                        "max_value": col_profile.max_value
                    }
                })

        elif col_profile.data_type == 'float':
            expectations.append({
                "expectation_type": "expect_column_values_to_be_in_type_list",
                "kwargs": {
                    "column": col_name,
                    "type_list": ["float", "float64", "FLOAT", "DOUBLE"]
                }
            })
            if col_profile.min_value is not None:
                expectations.append({
                    "expectation_type": "expect_column_values_to_be_between",
                    "kwargs": {
                        "column": col_name,
                        "min_value": col_profile.min_value,
                        "max_value": col_profile.max_value
                    }
                })

        elif col_profile.data_type == 'email':
            expectations.append({
                "expectation_type": "expect_column_values_to_match_regex",
                "kwargs": {
                    "column": col_name,
                    "regex": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
                }
            })

        elif col_profile.data_type in ('date_iso', 'date'):
            expectations.append({
                "expectation_type": "expect_column_values_to_match_strftime_format",
                "kwargs": {
                    "column": col_name,
                    "strftime_format": "%Y-%m-%d"
                }
            })

        # String length expectations
        if col_profile.min_length is not None:
            expectations.append({
                "expectation_type": "expect_column_value_lengths_to_be_between",
                "kwargs": {
                    "column": col_name,
                    "min_value": max(1, col_profile.min_length),
                    "max_value": col_profile.max_length * 2 if col_profile.max_length else None
                }
            })

        # Categorical (low cardinality) columns
        if col_profile.unique_count <= 20 and col_profile.unique_percentage < 10:
            top_values = [v[0] for v in col_profile.top_values if v[1] > col_profile.total_count * 0.01]
            if top_values:
                expectations.append({
                    "expectation_type": "expect_column_values_to_be_in_set",
                    "kwargs": {
                        "column": col_name,
                        "value_set": top_values,
                        "mostly": 0.95
                    }
                })

        return expectations


# =============================================================================
# Quality Score Calculator
# =============================================================================

class QualityScoreCalculator:
    """Calculate overall data quality scores"""

    def calculate(self, profile: DataProfile, validation_results: List[ValidationResult]) -> QualityScore:
        """Calculate quality score from profile and validation results"""
        # Completeness: average non-null percentage
        completeness = 100 - statistics.mean([c.null_percentage for c in profile.columns]) if profile.columns else 0

        # Uniqueness: average unique percentage for columns expected to be unique
        unique_cols = [c for c in profile.columns if c.unique_percentage > 90]
        uniqueness = statistics.mean([c.unique_percentage for c in unique_cols]) if unique_cols else 100

        # Validity: percentage of passed checks
        total_checks = len(validation_results)
        passed_checks = sum(1 for r in validation_results if r.passed)
        validity = (passed_checks / total_checks * 100) if total_checks > 0 else 100

        # Consistency: percentage of non-error results
        error_checks = sum(1 for r in validation_results if not r.passed and r.severity == "error")
        consistency = ((total_checks - error_checks) / total_checks * 100) if total_checks > 0 else 100

        # Accuracy: based on pattern matching and type detection
        pattern_detected = sum(1 for c in profile.columns if c.detected_pattern)
        accuracy = min(100, 50 + (pattern_detected / len(profile.columns) * 50)) if profile.columns else 50

        # Overall: weighted average
        overall = (
            completeness * 0.25 +
            uniqueness * 0.15 +
            validity * 0.30 +
            consistency * 0.20 +
            accuracy * 0.10
        )

        return QualityScore(
            completeness=round(completeness, 2),
            uniqueness=round(uniqueness, 2),
            validity=round(validity, 2),
            consistency=round(consistency, 2),
            accuracy=round(accuracy, 2),
            overall=round(overall, 2)
        )


# =============================================================================
# Data Contract Validator
# =============================================================================

class DataContractValidator:
    """Validate data against a data contract"""

    def load_contract(self, contract_path: str) -> Dict:
        """Load a data contract from file"""
        with open(contract_path, 'r') as f:
            content = f.read()

        # Support both YAML and JSON
        if contract_path.endswith('.yaml') or contract_path.endswith('.yml'):
            # Simple YAML parsing (for basic contracts)
            contract = self._parse_simple_yaml(content)
        else:
            contract = json.loads(content)

        return contract

    def _parse_simple_yaml(self, content: str) -> Dict:
        """Parse simple YAML-like format"""
        result = {}
        current_section = result
        section_stack = [(result, -1)]

        for line in content.split('\n'):
            if not line.strip() or line.strip().startswith('#'):
                continue

            # Calculate indentation
            indent = len(line) - len(line.lstrip())
            line = line.strip()

            # Pop sections with greater or equal indentation
            while section_stack and section_stack[-1][1] >= indent:
                section_stack.pop()

            current_section = section_stack[-1][0]

            if ':' in line:
                key, value = line.split(':', 1)
                key = key.strip()
                value = value.strip()

                if value:
                    # Handle lists
                    if value.startswith('[') and value.endswith(']'):
                        current_section[key] = [v.strip().strip('"\'') for v in value[1:-1].split(',')]
                    elif value.lower() in ('true', 'false'):
                        current_section[key] = value.lower() == 'true'
                    elif value.isdigit():
                        current_section[key] = int(value)
                    else:
                        current_section[key] = value.strip('"\'')
                else:
                    current_section[key] = {}
                    section_stack.append((current_section[key], indent))
            elif line.startswith('- '):
                # List item
                if not isinstance(current_section, list):
                    # Convert to list
                    parent = section_stack[-2][0] if len(section_stack) > 1 else result
                    for k, v in parent.items():
                        if v is current_section:
                            parent[k] = [current_section] if current_section else []
                            current_section = parent[k]
                            section_stack[-1] = (current_section, section_stack[-1][1])
                            break
                current_section.append(line[2:].strip())

        return result

    def validate_contract(self, data: List[Dict], contract: Dict) -> List[ValidationResult]:
        """Validate data against contract"""
        results = []

        # Validate schema section
        if 'schema' in contract:
            schema_def = contract['schema']
            columns = schema_def.get('columns', schema_def.get('fields', []))

            for col_def in columns:
                col_name = col_def.get('name', col_def.get('column', ''))
                if not col_name:
                    continue

                # Check column exists
                if data and col_name not in data[0]:
                    results.append(ValidationResult(
                        check_name="contract_column_exists",
                        column=col_name,
                        passed=False,
                        expected="column present",
                        actual="column missing",
                        severity="error",
                        message=f"Contract requires column '{col_name}' but it's missing"
                    ))
                    continue

                # Check data type
                expected_type = col_def.get('type', col_def.get('data_type', 'string'))
                values = [row.get(col_name) for row in data]
                non_null = [str(v) for v in values if v is not None and v != '']

                if non_null:
                    detected_type = TypeDetector.detect_type(non_null[:1000])
                    type_compatible = self._types_compatible(detected_type, expected_type)

                    if not type_compatible:
                        results.append(ValidationResult(
                            check_name="contract_data_type",
                            column=col_name,
                            passed=False,
                            expected=expected_type,
                            actual=detected_type,
                            severity="error",
                            message=f"Contract expects type '{expected_type}' but detected '{detected_type}'"
                        ))

                # Check nullable
                if not col_def.get('nullable', True):
                    null_count = sum(1 for v in values if v is None or v == '')
                    if null_count > 0:
                        results.append(ValidationResult(
                            check_name="contract_not_null",
                            column=col_name,
                            passed=False,
                            expected="no nulls",
                            actual=f"{null_count} nulls",
                            severity="error",
                            message=f"Contract requires non-null but found {null_count} nulls"
                        ))

        # Validate SLA section
        if 'sla' in contract:
            sla = contract['sla']

            # Row count bounds
            min_rows = sla.get('min_rows', sla.get('minimum_records'))
            max_rows = sla.get('max_rows', sla.get('maximum_records'))

            row_count = len(data)
            if min_rows and row_count < min_rows:
                results.append(ValidationResult(
                    check_name="contract_min_rows",
                    column=None,
                    passed=False,
                    expected=f">= {min_rows} rows",
                    actual=f"{row_count} rows",
                    severity="error",
                    message=f"Contract requires at least {min_rows} rows"
                ))

            if max_rows and row_count > max_rows:
                results.append(ValidationResult(
                    check_name="contract_max_rows",
                    column=None,
                    passed=False,
                    expected=f"<= {max_rows} rows",
                    actual=f"{row_count} rows",
                    severity="warning",
                    message=f"Contract allows at most {max_rows} rows"
                ))

        return results

    def _types_compatible(self, detected: str, expected: str) -> bool:
        """Check if detected type is compatible with expected type"""
        expected = expected.lower()
        detected = detected.lower()

        type_groups = {
            'numeric': ['integer', 'int', 'float', 'double', 'decimal', 'number'],
            'string': ['string', 'varchar', 'char', 'text'],
            'boolean': ['boolean', 'bool'],
            'date': ['date', 'date_iso'],
            'datetime': ['datetime', 'datetime_iso', 'timestamp'],
        }

        for group, types in type_groups.items():
            if expected in types and detected in types:
                return True

        return detected == expected


# =============================================================================
# Report Generator
# =============================================================================

class ReportGenerator:
    """Generate validation reports"""

    def generate_text_report(self,
                            profile: DataProfile,
                            results: List[ValidationResult],
                            score: QualityScore) -> str:
        """Generate a text report"""
        lines = []
        lines.append("=" * 80)
        lines.append("DATA QUALITY VALIDATION REPORT")
        lines.append("=" * 80)
        lines.append(f"\nDataset: {profile.name}")
        lines.append(f"Generated: {datetime.now().isoformat()}")
        lines.append(f"Rows: {profile.row_count:,}")
        lines.append(f"Columns: {profile.column_count}")
        lines.append(f"Duplicate Rows: {profile.duplicate_rows:,}")

        # Quality Score
        lines.append("\n" + "-" * 40)
        lines.append("QUALITY SCORES")
        lines.append("-" * 40)
        lines.append(f"  Overall:      {score.overall:>6.1f}% {'✓' if score.overall >= 80 else '✗'}")
        lines.append(f"  Completeness: {score.completeness:>6.1f}%")
        lines.append(f"  Uniqueness:   {score.uniqueness:>6.1f}%")
        lines.append(f"  Validity:     {score.validity:>6.1f}%")
        lines.append(f"  Consistency:  {score.consistency:>6.1f}%")
        lines.append(f"  Accuracy:     {score.accuracy:>6.1f}%")

        # Validation Results Summary
        passed = sum(1 for r in results if r.passed)
        failed = len(results) - passed
        errors = sum(1 for r in results if not r.passed and r.severity == "error")
        warnings = sum(1 for r in results if not r.passed and r.severity == "warning")

        lines.append("\n" + "-" * 40)
        lines.append("VALIDATION SUMMARY")
        lines.append("-" * 40)
        lines.append(f"  Total Checks: {len(results)}")
        lines.append(f"  Passed:       {passed} ✓")
        lines.append(f"  Failed:       {failed} ✗")
        lines.append(f"    Errors:     {errors}")
        lines.append(f"    Warnings:   {warnings}")

        # Failed checks details
        if failed > 0:
            lines.append("\n" + "-" * 40)
            lines.append("FAILED CHECKS")
            lines.append("-" * 40)

            for r in results:
                if not r.passed:
                    severity_icon = "❌" if r.severity == "error" else "⚠️"
                    col_str = f"[{r.column}]" if r.column else ""
                    lines.append(f"\n{severity_icon} {r.check_name} {col_str}")
                    lines.append(f"   Expected: {r.expected}")
                    lines.append(f"   Actual:   {r.actual}")
                    if r.message:
                        lines.append(f"   Message:  {r.message}")

        # Column profiles
        lines.append("\n" + "-" * 40)
        lines.append("COLUMN PROFILES")
        lines.append("-" * 40)

        for col in profile.columns:
            lines.append(f"\n  {col.name}")
            lines.append(f"    Type: {col.data_type}")
            lines.append(f"    Nulls: {col.null_count:,} ({col.null_percentage:.1f}%)")
            lines.append(f"    Unique: {col.unique_count:,} ({col.unique_percentage:.1f}%)")

            if col.min_value is not None:
                lines.append(f"    Range: [{col.min_value:.2f}, {col.max_value:.2f}]")
                lines.append(f"    Mean: {col.mean:.2f}, Median: {col.median:.2f}")

            if col.min_length is not None:
                lines.append(f"    Length: [{col.min_length}, {col.max_length}] (avg: {col.avg_length:.1f})")

            if col.detected_pattern:
                lines.append(f"    Pattern: {col.detected_pattern}")

            if col.top_values:
                top_3 = col.top_values[:3]
                lines.append(f"    Top values: {', '.join(f'{v[0]} ({v[1]})' for v in top_3)}")

        lines.append("\n" + "=" * 80)

        return "\n".join(lines)

    def generate_json_report(self,
                            profile: DataProfile,
                            results: List[ValidationResult],
                            score: QualityScore) -> Dict:
        """Generate a JSON report"""
        return {
            "report_type": "data_quality_validation",
            "generated_at": datetime.now().isoformat(),
            "dataset": {
                "name": profile.name,
                "row_count": profile.row_count,
                "column_count": profile.column_count,
                "duplicate_rows": profile.duplicate_rows,
                "memory_bytes": profile.memory_size_bytes
            },
            "quality_score": asdict(score),
            "validation_summary": {
                "total_checks": len(results),
                "passed": sum(1 for r in results if r.passed),
                "failed": sum(1 for r in results if not r.passed),
                "errors": sum(1 for r in results if not r.passed and r.severity == "error"),
                "warnings": sum(1 for r in results if not r.passed and r.severity == "warning")
            },
            "validation_results": [
                {
                    "check": r.check_name,
                    "column": r.column,
                    "passed": r.passed,
                    "severity": r.severity,
                    "expected": str(r.expected),
                    "actual": str(r.actual),
                    "message": r.message
                }
                for r in results
            ],
            "column_profiles": [asdict(c) for c in profile.columns]
        }


# =============================================================================
# Data Loader
# =============================================================================

class DataLoader:
    """Load data from various formats"""

    @staticmethod
    def load(file_path: str) -> List[Dict]:
        """Load data from file"""
        path = Path(file_path)

        if not path.exists():
            raise FileNotFoundError(f"File not found: {file_path}")

        suffix = path.suffix.lower()

        if suffix == '.csv':
            return DataLoader._load_csv(file_path)
        elif suffix == '.json':
            return DataLoader._load_json(file_path)
        elif suffix == '.jsonl':
            return DataLoader._load_jsonl(file_path)
        else:
            raise ValueError(f"Unsupported file format: {suffix}")

    @staticmethod
    def _load_csv(file_path: str) -> List[Dict]:
        """Load CSV file"""
        data = []
        with open(file_path, 'r', newline='', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                data.append(dict(row))
        return data

    @staticmethod
    def _load_json(file_path: str) -> List[Dict]:
        """Load JSON file"""
        with open(file_path, 'r', encoding='utf-8') as f:
            content = json.load(f)

        if isinstance(content, list):
            return content
        elif isinstance(content, dict):
            # Check for common data keys
            for key in ['data', 'records', 'rows', 'items']:
                if key in content and isinstance(content[key], list):
                    return content[key]
            return [content]
        else:
            raise ValueError("JSON must contain array or object with data key")

    @staticmethod
    def _load_jsonl(file_path: str) -> List[Dict]:
        """Load JSON Lines file"""
        data = []
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if line:
                    data.append(json.loads(line))
        return data


# =============================================================================
# Schema Loader
# =============================================================================

class SchemaLoader:
    """Load schema definitions"""

    @staticmethod
    def load(file_path: str) -> DataSchema:
        """Load schema from JSON file"""
        with open(file_path, 'r', encoding='utf-8') as f:
            schema_dict = json.load(f)

        columns = []
        for col_def in schema_dict.get('columns', []):
            columns.append(ColumnSchema(
                name=col_def['name'],
                data_type=col_def.get('type', col_def.get('data_type', 'string')),
                nullable=col_def.get('nullable', True),
                unique=col_def.get('unique', False),
                min_value=col_def.get('min_value'),
                max_value=col_def.get('max_value'),
                min_length=col_def.get('min_length'),
                max_length=col_def.get('max_length'),
                pattern=col_def.get('pattern'),
                allowed_values=col_def.get('allowed_values'),
                description=col_def.get('description', '')
            ))

        return DataSchema(
            name=schema_dict.get('name', 'unknown'),
            version=schema_dict.get('version', '1.0'),
            columns=columns,
            primary_key=schema_dict.get('primary_key'),
            row_count_min=schema_dict.get('row_count_min'),
            row_count_max=schema_dict.get('row_count_max')
        )


# =============================================================================
# CLI Interface
# =============================================================================

def cmd_validate(args):
    """Run validation against schema"""
    logger.info(f"Loading data from {args.input}")
    data = DataLoader.load(args.input)

    results = []

    if args.schema:
        logger.info(f"Loading schema from {args.schema}")
        schema = SchemaLoader.load(args.schema)

        validator = SchemaValidator()
        results = validator.validate(data, schema)

    if args.detect_anomalies:
        logger.info("Running anomaly detection")
        anomaly_detector = AnomalyDetector()
        anomaly_results = anomaly_detector.validate(data)
        results.extend(anomaly_results)

    # Profile data
    profiler = DataProfiler()
    profile = profiler.profile(data, name=Path(args.input).stem)

    # Calculate score
    score_calc = QualityScoreCalculator()
    score = score_calc.calculate(profile, results)

    # Generate report
    reporter = ReportGenerator()

    if args.json:
        report = reporter.generate_json_report(profile, results, score)
        output = json.dumps(report, indent=2)
    else:
        output = reporter.generate_text_report(profile, results, score)

    if args.output:
        with open(args.output, 'w') as f:
            f.write(output)
        logger.info(f"Report saved to {args.output}")
    else:
        print(output)

    # Exit with error if validation failed
    errors = sum(1 for r in results if not r.passed and r.severity == "error")
    if errors > 0:
        sys.exit(1)


def cmd_profile(args):
    """Generate data profile"""
    logger.info(f"Loading data from {args.input}")
    data = DataLoader.load(args.input)

    profiler = DataProfiler()
    profile = profiler.profile(data, name=Path(args.input).stem)

    if args.json or args.output:
        output = json.dumps(asdict(profile), indent=2, default=str)
    else:
        # Text output
        lines = []
        lines.append(f"Dataset: {profile.name}")
        lines.append(f"Rows: {profile.row_count:,}")
        lines.append(f"Columns: {profile.column_count}")
        lines.append(f"Duplicate rows: {profile.duplicate_rows:,}")
        lines.append(f"\nColumn Profiles:")

        for col in profile.columns:
            lines.append(f"\n  {col.name} ({col.data_type})")
            lines.append(f"    Nulls: {col.null_percentage:.1f}%")
            lines.append(f"    Unique: {col.unique_percentage:.1f}%")
            if col.mean is not None:
                lines.append(f"    Stats: min={col.min_value}, max={col.max_value}, mean={col.mean:.2f}")

        output = "\n".join(lines)

    if args.output:
        with open(args.output, 'w') as f:
            f.write(output)
        logger.info(f"Profile saved to {args.output}")
    else:
        print(output)


def cmd_generate_suite(args):
    """Generate Great Expectations suite"""
    logger.info(f"Loading data from {args.input}")
    data = DataLoader.load(args.input)

    # Profile first
    profiler = DataProfiler()
    profile = profiler.profile(data, name=Path(args.input).stem)

    # Generate suite
    generator = GreatExpectationsGenerator()
    suite = generator.generate_suite(profile)

    output = json.dumps(suite, indent=2)

    if args.output:
        with open(args.output, 'w') as f:
            f.write(output)
        logger.info(f"Expectation suite saved to {args.output}")
    else:
        print(output)


def cmd_contract(args):
    """Validate against data contract"""
    logger.info(f"Loading data from {args.input}")
    data = DataLoader.load(args.input)

    logger.info(f"Loading contract from {args.contract}")
    contract_validator = DataContractValidator()
    contract = contract_validator.load_contract(args.contract)

    results = contract_validator.validate_contract(data, contract)

    # Profile data
    profiler = DataProfiler()
    profile = profiler.profile(data, name=Path(args.input).stem)

    # Calculate score
    score_calc = QualityScoreCalculator()
    score = score_calc.calculate(profile, results)

    # Generate report
    reporter = ReportGenerator()

    if args.json:
        report = reporter.generate_json_report(profile, results, score)
        output = json.dumps(report, indent=2)
    else:
        output = reporter.generate_text_report(profile, results, score)

    if args.output:
        with open(args.output, 'w') as f:
            f.write(output)
        logger.info(f"Report saved to {args.output}")
    else:
        print(output)

    # Exit with error if contract validation failed
    errors = sum(1 for r in results if not r.passed and r.severity == "error")
    if errors > 0:
        sys.exit(1)


def cmd_schema(args):
    """Generate schema from data"""
    logger.info(f"Loading data from {args.input}")
    data = DataLoader.load(args.input)

    if not data:
        logger.error("Empty dataset")
        sys.exit(1)

    # Profile to detect types
    profiler = DataProfiler()
    profile = profiler.profile(data, name=Path(args.input).stem)

    # Generate schema
    schema = {
        "name": profile.name,
        "version": "1.0",
        "columns": []
    }

    for col in profile.columns:
        col_schema = {
            "name": col.name,
            "type": col.data_type,
            "nullable": col.null_percentage > 0,
            "description": ""
        }

        if col.unique_percentage > 99:
            col_schema["unique"] = True

        if col.min_value is not None:
            col_schema["min_value"] = col.min_value
            col_schema["max_value"] = col.max_value

        if col.min_length is not None:
            col_schema["min_length"] = col.min_length
            col_schema["max_length"] = col.max_length

        if col.detected_pattern:
            col_schema["pattern"] = col.detected_pattern

        # Add allowed values for low-cardinality columns
        if col.unique_count <= 20 and col.unique_percentage < 10:
            col_schema["allowed_values"] = [v[0] for v in col.top_values]

        schema["columns"].append(col_schema)

    output = json.dumps(schema, indent=2)

    if args.output:
        with open(args.output, 'w') as f:
            f.write(output)
        logger.info(f"Schema saved to {args.output}")
    else:
        print(output)


def main():
    """Main entry point"""
    parser = argparse.ArgumentParser(
        description="Data Quality Validator - Comprehensive data quality validation",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Validate data against schema
  python data_quality_validator.py validate data.csv --schema schema.json

  # Profile data
  python data_quality_validator.py profile data.csv --output profile.json

  # Generate Great Expectations suite
  python data_quality_validator.py generate-suite data.csv --output expectations.json

  # Validate against data contract
  python data_quality_validator.py contract data.csv --contract contract.yaml

  # Generate schema from data
  python data_quality_validator.py schema data.csv --output schema.json
        """
    )

    parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')

    subparsers = parser.add_subparsers(dest='command', help='Command to run')

    # Validate command
    validate_parser = subparsers.add_parser('validate', help='Validate data against schema')
    validate_parser.add_argument('input', help='Input data file (CSV, JSON, JSONL)')
    validate_parser.add_argument('--schema', '-s', help='Schema file (JSON)')
    validate_parser.add_argument('--output', '-o', help='Output report file')
    validate_parser.add_argument('--json', action='store_true', help='Output as JSON')
    validate_parser.add_argument('--detect-anomalies', action='store_true', help='Detect statistical anomalies')
    validate_parser.set_defaults(func=cmd_validate)

    # Profile command
    profile_parser = subparsers.add_parser('profile', help='Generate data profile')
    profile_parser.add_argument('input', help='Input data file')
    profile_parser.add_argument('--output', '-o', help='Output profile file')
    profile_parser.add_argument('--json', action='store_true', help='Output as JSON')
    profile_parser.set_defaults(func=cmd_profile)

    # Generate suite command
    suite_parser = subparsers.add_parser('generate-suite', help='Generate Great Expectations suite')
    suite_parser.add_argument('input', help='Input data file')
    suite_parser.add_argument('--output', '-o', help='Output expectations file')
    suite_parser.set_defaults(func=cmd_generate_suite)

    # Contract command
    contract_parser = subparsers.add_parser('contract', help='Validate against data contract')
    contract_parser.add_argument('input', help='Input data file')
    contract_parser.add_argument('--contract', '-c', required=True, help='Data contract file (YAML or JSON)')
    contract_parser.add_argument('--output', '-o', help='Output report file')
    contract_parser.add_argument('--json', action='store_true', help='Output as JSON')
    contract_parser.set_defaults(func=cmd_contract)

    # Schema command
    schema_parser = subparsers.add_parser('schema', help='Generate schema from data')
    schema_parser.add_argument('input', help='Input data file')
    schema_parser.add_argument('--output', '-o', help='Output schema file')
    schema_parser.set_defaults(func=cmd_schema)

    args = parser.parse_args()

    if args.verbose:
        logging.getLogger().setLevel(logging.DEBUG)

    if not args.command:
        parser.print_help()
        sys.exit(1)

    try:
        args.func(args)
    except Exception as e:
        logger.error(f"Error: {e}")
        if args.verbose:
            import traceback
            traceback.print_exc()
        sys.exit(1)


if __name__ == '__main__':
    main()

```

### scripts/etl_performance_optimizer.py

```python
#!/usr/bin/env python3
"""
ETL Performance Optimizer
Comprehensive ETL/ELT performance analysis and optimization tool.

Features:
- SQL query analysis and optimization recommendations
- Spark job configuration analysis
- Data skew detection and mitigation
- Partition strategy recommendations
- Join optimization suggestions
- Memory and shuffle analysis
- Cost estimation for cloud warehouses

Usage:
    python etl_performance_optimizer.py analyze-sql query.sql
    python etl_performance_optimizer.py analyze-spark spark-history.json
    python etl_performance_optimizer.py optimize-partition data_stats.json
    python etl_performance_optimizer.py estimate-cost query.sql --warehouse snowflake
"""

import os
import sys
import json
import re
import argparse
import logging
import math
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple, Set
from dataclasses import dataclass, field, asdict
from datetime import datetime
from collections import defaultdict
from abc import ABC, abstractmethod

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


# =============================================================================
# Data Classes
# =============================================================================

@dataclass
class SQLQueryInfo:
    """Parsed information about a SQL query"""
    query_type: str  # SELECT, INSERT, UPDATE, DELETE, MERGE, CREATE
    tables: List[str]
    columns: List[str]
    joins: List[Dict[str, str]]
    where_conditions: List[str]
    group_by: List[str]
    order_by: List[str]
    aggregations: List[str]
    subqueries: int
    distinct: bool
    limit: Optional[int]
    ctes: List[str]
    window_functions: List[str]
    estimated_complexity: str  # low, medium, high, very_high


@dataclass
class OptimizationRecommendation:
    """A single optimization recommendation"""
    category: str  # index, partition, join, filter, aggregation, memory, shuffle
    severity: str  # critical, high, medium, low
    title: str
    description: str
    current_issue: str
    recommendation: str
    expected_improvement: str
    implementation: str
    priority: int = 1


@dataclass
class SparkJobMetrics:
    """Metrics from a Spark job"""
    job_id: str
    duration_ms: int
    stages: int
    tasks: int
    shuffle_read_bytes: int
    shuffle_write_bytes: int
    input_bytes: int
    output_bytes: int
    peak_memory_bytes: int
    gc_time_ms: int
    failed_tasks: int
    speculative_tasks: int
    skew_ratio: float  # max_task_time / median_task_time


@dataclass
class PartitionStrategy:
    """Recommended partition strategy"""
    column: str
    partition_type: str  # range, hash, list
    num_partitions: Optional[int]
    partition_size_mb: float
    reasoning: str
    implementation: str


@dataclass
class CostEstimate:
    """Cost estimate for a query"""
    warehouse: str
    compute_cost: float
    storage_cost: float
    data_transfer_cost: float
    total_cost: float
    currency: str = "USD"
    assumptions: List[str] = field(default_factory=list)


# =============================================================================
# SQL Parser
# =============================================================================

class SQLParser:
    """Parse and analyze SQL queries"""

    # Common SQL patterns
    PATTERNS = {
        'select': re.compile(r'\bSELECT\b', re.IGNORECASE),
        'from': re.compile(r'\bFROM\b', re.IGNORECASE),
        'join': re.compile(r'\b(INNER|LEFT|RIGHT|FULL|CROSS)?\s*JOIN\b', re.IGNORECASE),
        'where': re.compile(r'\bWHERE\b', re.IGNORECASE),
        'group_by': re.compile(r'\bGROUP\s+BY\b', re.IGNORECASE),
        'order_by': re.compile(r'\bORDER\s+BY\b', re.IGNORECASE),
        'having': re.compile(r'\bHAVING\b', re.IGNORECASE),
        'distinct': re.compile(r'\bDISTINCT\b', re.IGNORECASE),
        'limit': re.compile(r'\bLIMIT\s+(\d+)', re.IGNORECASE),
        'cte': re.compile(r'\bWITH\b', re.IGNORECASE),
        'subquery': re.compile(r'\(\s*SELECT\b', re.IGNORECASE),
        'window': re.compile(r'\bOVER\s*\(', re.IGNORECASE),
        'aggregation': re.compile(r'\b(COUNT|SUM|AVG|MIN|MAX|STDDEV|VARIANCE)\s*\(', re.IGNORECASE),
        'insert': re.compile(r'\bINSERT\s+INTO\b', re.IGNORECASE),
        'update': re.compile(r'\bUPDATE\b', re.IGNORECASE),
        'delete': re.compile(r'\bDELETE\s+FROM\b', re.IGNORECASE),
        'merge': re.compile(r'\bMERGE\s+INTO\b', re.IGNORECASE),
        'create': re.compile(r'\bCREATE\s+(TABLE|VIEW|INDEX)\b', re.IGNORECASE),
    }

    def parse(self, sql: str) -> SQLQueryInfo:
        """Parse a SQL query and extract information"""
        # Clean up the query
        sql = self._clean_sql(sql)

        # Determine query type
        query_type = self._detect_query_type(sql)

        # Extract tables
        tables = self._extract_tables(sql)

        # Extract columns (for SELECT queries)
        columns = self._extract_columns(sql) if query_type == 'SELECT' else []

        # Extract joins
        joins = self._extract_joins(sql)

        # Extract WHERE conditions
        where_conditions = self._extract_where_conditions(sql)

        # Extract GROUP BY
        group_by = self._extract_group_by(sql)

        # Extract ORDER BY
        order_by = self._extract_order_by(sql)

        # Extract aggregations
        aggregations = self._extract_aggregations(sql)

        # Count subqueries
        subqueries = len(self.PATTERNS['subquery'].findall(sql))

        # Check for DISTINCT
        distinct = bool(self.PATTERNS['distinct'].search(sql))

        # Extract LIMIT
        limit_match = self.PATTERNS['limit'].search(sql)
        limit = int(limit_match.group(1)) if limit_match else None

        # Extract CTEs
        ctes = self._extract_ctes(sql)

        # Extract window functions
        window_functions = self._extract_window_functions(sql)

        # Estimate complexity
        complexity = self._estimate_complexity(
            tables, joins, subqueries, aggregations, window_functions
        )

        return SQLQueryInfo(
            query_type=query_type,
            tables=tables,
            columns=columns,
            joins=joins,
            where_conditions=where_conditions,
            group_by=group_by,
            order_by=order_by,
            aggregations=aggregations,
            subqueries=subqueries,
            distinct=distinct,
            limit=limit,
            ctes=ctes,
            window_functions=window_functions,
            estimated_complexity=complexity
        )

    def _clean_sql(self, sql: str) -> str:
        """Clean and normalize SQL"""
        # Remove comments
        sql = re.sub(r'--.*$', '', sql, flags=re.MULTILINE)
        sql = re.sub(r'/\*.*?\*/', '', sql, flags=re.DOTALL)
        # Normalize whitespace
        sql = ' '.join(sql.split())
        return sql

    def _detect_query_type(self, sql: str) -> str:
        """Detect the type of SQL query"""
        sql_upper = sql.upper().strip()

        if sql_upper.startswith('WITH') or sql_upper.startswith('SELECT'):
            return 'SELECT'
        elif self.PATTERNS['insert'].search(sql):
            return 'INSERT'
        elif self.PATTERNS['update'].search(sql):
            return 'UPDATE'
        elif self.PATTERNS['delete'].search(sql):
            return 'DELETE'
        elif self.PATTERNS['merge'].search(sql):
            return 'MERGE'
        elif self.PATTERNS['create'].search(sql):
            return 'CREATE'
        else:
            return 'UNKNOWN'

    def _extract_tables(self, sql: str) -> List[str]:
        """Extract table names from SQL"""
        tables = []

        # FROM clause tables
        from_pattern = re.compile(
            r'\bFROM\s+([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)?)',
            re.IGNORECASE
        )
        tables.extend(from_pattern.findall(sql))

        # JOIN clause tables
        join_pattern = re.compile(
            r'\bJOIN\s+([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)?)',
            re.IGNORECASE
        )
        tables.extend(join_pattern.findall(sql))

        # INSERT INTO table
        insert_pattern = re.compile(
            r'\bINSERT\s+INTO\s+([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)?)',
            re.IGNORECASE
        )
        tables.extend(insert_pattern.findall(sql))

        # UPDATE table
        update_pattern = re.compile(
            r'\bUPDATE\s+([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)?)',
            re.IGNORECASE
        )
        tables.extend(update_pattern.findall(sql))

        return list(set(tables))

    def _extract_columns(self, sql: str) -> List[str]:
        """Extract column references from SELECT clause"""
        # Find SELECT ... FROM
        match = re.search(r'\bSELECT\s+(.*?)\s+FROM\b', sql, re.IGNORECASE | re.DOTALL)
        if not match:
            return []

        select_clause = match.group(1)

        # Handle SELECT *
        if '*' in select_clause and 'COUNT(*)' not in select_clause.upper():
            return ['*']

        # Extract column names (simplified)
        columns = []
        for part in select_clause.split(','):
            part = part.strip()
            # Handle aliases
            alias_match = re.search(r'\bAS\s+(\w+)\s*$', part, re.IGNORECASE)
            if alias_match:
                columns.append(alias_match.group(1))
            else:
                # Get the last identifier
                col_match = re.search(r'([a-zA-Z_][a-zA-Z0-9_]*)(?:\s*$|\s+AS\b)', part, re.IGNORECASE)
                if col_match:
                    columns.append(col_match.group(1))

        return columns

    def _extract_joins(self, sql: str) -> List[Dict[str, str]]:
        """Extract join information"""
        joins = []

        join_pattern = re.compile(
            r'\b(INNER|LEFT\s+OUTER?|RIGHT\s+OUTER?|FULL\s+OUTER?|CROSS)?\s*JOIN\s+'
            r'([a-zA-Z_][a-zA-Z0-9_.]*)\s*(?:AS\s+)?(\w+)?\s*'
            r'(?:ON\s+(.+?))?(?=\s+(?:INNER|LEFT|RIGHT|FULL|CROSS|WHERE|GROUP|ORDER|HAVING|LIMIT|$))',
            re.IGNORECASE | re.DOTALL
        )

        for match in join_pattern.finditer(sql):
            join_type = match.group(1) or 'INNER'
            table = match.group(2)
            alias = match.group(3)
            condition = match.group(4)

            joins.append({
                'type': join_type.strip().upper(),
                'table': table,
                'alias': alias,
                'condition': condition.strip() if condition else None
            })

        return joins

    def _extract_where_conditions(self, sql: str) -> List[str]:
        """Extract WHERE clause conditions"""
        # Find WHERE ... (GROUP BY | ORDER BY | HAVING | LIMIT | end)
        match = re.search(
            r'\bWHERE\s+(.*?)(?=\s+(?:GROUP\s+BY|ORDER\s+BY|HAVING|LIMIT)|$)',
            sql, re.IGNORECASE | re.DOTALL
        )
        if not match:
            return []

        where_clause = match.group(1).strip()

        # Split by AND/OR (simplified)
        conditions = re.split(r'\s+AND\s+|\s+OR\s+', where_clause, flags=re.IGNORECASE)
        return [c.strip() for c in conditions if c.strip()]

    def _extract_group_by(self, sql: str) -> List[str]:
        """Extract GROUP BY columns"""
        match = re.search(
            r'\bGROUP\s+BY\s+(.*?)(?=\s+(?:HAVING|ORDER\s+BY|LIMIT)|$)',
            sql, re.IGNORECASE | re.DOTALL
        )
        if not match:
            return []

        group_clause = match.group(1).strip()
        columns = [c.strip() for c in group_clause.split(',')]
        return columns

    def _extract_order_by(self, sql: str) -> List[str]:
        """Extract ORDER BY columns"""
        match = re.search(
            r'\bORDER\s+BY\s+(.*?)(?=\s+LIMIT|$)',
            sql, re.IGNORECASE | re.DOTALL
        )
        if not match:
            return []

        order_clause = match.group(1).strip()
        columns = [c.strip() for c in order_clause.split(',')]
        return columns

    def _extract_aggregations(self, sql: str) -> List[str]:
        """Extract aggregation functions used"""
        agg_pattern = re.compile(
            r'\b(COUNT|SUM|AVG|MIN|MAX|STDDEV|VARIANCE|MEDIAN|PERCENTILE_CONT|PERCENTILE_DISC)\s*\(',
            re.IGNORECASE
        )
        return list(set(m.upper() for m in agg_pattern.findall(sql)))

    def _extract_ctes(self, sql: str) -> List[str]:
        """Extract CTE names"""
        cte_pattern = re.compile(
            r'\bWITH\s+(\w+)\s+AS\s*\(|,\s*(\w+)\s+AS\s*\(',
            re.IGNORECASE
        )
        ctes = []
        for match in cte_pattern.finditer(sql):
            cte_name = match.group(1) or match.group(2)
            if cte_name:
                ctes.append(cte_name)
        return ctes

    def _extract_window_functions(self, sql: str) -> List[str]:
        """Extract window function patterns"""
        window_pattern = re.compile(
            r'\b(\w+)\s*\([^)]*\)\s+OVER\s*\(',
            re.IGNORECASE
        )
        return list(set(m.upper() for m in window_pattern.findall(sql)))

    def _estimate_complexity(self, tables: List[str], joins: List[Dict],
                            subqueries: int, aggregations: List[str],
                            window_functions: List[str]) -> str:
        """Estimate query complexity"""
        score = 0

        # Table count
        score += len(tables) * 10

        # Join count and types
        for join in joins:
            if join['type'] in ('CROSS', 'FULL OUTER'):
                score += 30
            elif join['type'] in ('LEFT OUTER', 'RIGHT OUTER'):
                score += 20
            else:
                score += 15

        # Subqueries
        score += subqueries * 25

        # Aggregations
        score += len(aggregations) * 5

        # Window functions
        score += len(window_functions) * 15

        if score < 30:
            return 'low'
        elif score < 60:
            return 'medium'
        elif score < 100:
            return 'high'
        else:
            return 'very_high'


# =============================================================================
# SQL Optimizer
# =============================================================================

class SQLOptimizer:
    """Analyze SQL queries and provide optimization recommendations"""

    def analyze(self, query_info: SQLQueryInfo, sql: str) -> List[OptimizationRecommendation]:
        """Analyze a SQL query and generate optimization recommendations"""
        recommendations = []

        # Check for SELECT *
        if '*' in query_info.columns:
            recommendations.append(self._recommend_explicit_columns())

        # Check for missing WHERE clause on large tables
        if not query_info.where_conditions and query_info.tables:
            recommendations.append(self._recommend_add_filters())

        # Check for inefficient joins
        join_recs = self._analyze_joins(query_info)
        recommendations.extend(join_recs)

        # Check for DISTINCT usage
        if query_info.distinct:
            recommendations.append(self._recommend_distinct_alternative())

        # Check for ORDER BY without LIMIT
        if query_info.order_by and not query_info.limit:
            recommendations.append(self._recommend_add_limit())

        # Check for subquery optimization
        if query_info.subqueries > 0:
            recommendations.append(self._recommend_cte_conversion())

        # Check for index opportunities
        index_recs = self._analyze_index_opportunities(query_info)
        recommendations.extend(index_recs)

        # Check for partition pruning
        partition_recs = self._analyze_partition_pruning(query_info, sql)
        recommendations.extend(partition_recs)

        # Check for aggregation optimization
        if query_info.aggregations and query_info.group_by:
            agg_recs = self._analyze_aggregation(query_info)
            recommendations.extend(agg_recs)

        # Sort by priority
        recommendations.sort(key=lambda r: r.priority)

        return recommendations

    def _recommend_explicit_columns(self) -> OptimizationRecommendation:
        return OptimizationRecommendation(
            category="query_structure",
            severity="medium",
            title="Avoid SELECT *",
            description="Using SELECT * retrieves all columns, increasing I/O and memory usage.",
            current_issue="Query uses SELECT * which fetches unnecessary columns",
            recommendation="Specify only the columns you need",
            expected_improvement="10-50% reduction in data scanned depending on table width",
            implementation="Replace SELECT * with SELECT col1, col2, col3 ...",
            priority=2
        )

    def _recommend_add_filters(self) -> OptimizationRecommendation:
        return OptimizationRecommendation(
            category="filter",
            severity="high",
            title="Add WHERE Clause Filters",
            description="Query scans entire tables without filtering, causing full table scans.",
            current_issue="No WHERE clause filters found - full table scan required",
            recommendation="Add appropriate WHERE conditions to filter data early",
            expected_improvement="Up to 90%+ reduction in data processed if highly selective",
            implementation="Add WHERE column = value or WHERE date_column >= '2024-01-01'",
            priority=1
        )

    def _analyze_joins(self, query_info: SQLQueryInfo) -> List[OptimizationRecommendation]:
        """Analyze joins for optimization opportunities"""
        recommendations = []

        for join in query_info.joins:
            # Check for CROSS JOIN
            if join['type'] == 'CROSS':
                recommendations.append(OptimizationRecommendation(
                    category="join",
                    severity="critical",
                    title="Avoid CROSS JOIN",
                    description="CROSS JOIN creates a Cartesian product, which can explode data volume.",
                    current_issue=f"CROSS JOIN with table {join['table']} detected",
                    recommendation="Replace with appropriate INNER/LEFT JOIN with ON condition",
                    expected_improvement="Exponential reduction in intermediate data",
                    implementation=f"Convert CROSS JOIN {join['table']} to INNER JOIN {join['table']} ON ...",
                    priority=1
                ))

            # Check for missing join condition
            if not join.get('condition'):
                recommendations.append(OptimizationRecommendation(
                    category="join",
                    severity="high",
                    title="Missing Join Condition",
                    description="Join without explicit ON condition may cause Cartesian product.",
                    current_issue=f"JOIN with {join['table']} has no explicit ON condition",
                    recommendation="Add explicit ON condition to the join",
                    expected_improvement="Prevents accidental Cartesian products",
                    implementation=f"Add ON {join['table']}.id = other_table.foreign_key",
                    priority=1
                ))

        # Check for many joins
        if len(query_info.joins) > 5:
            recommendations.append(OptimizationRecommendation(
                category="join",
                severity="medium",
                title="High Number of Joins",
                description="Many joins can lead to complex execution plans and performance issues.",
                current_issue=f"{len(query_info.joins)} joins detected in single query",
                recommendation="Consider breaking into smaller queries or pre-aggregating",
                expected_improvement="Better plan optimization and memory usage",
                implementation="Use CTEs to materialize intermediate results, or denormalize frequently joined data",
                priority=3
            ))

        return recommendations

    def _recommend_distinct_alternative(self) -> OptimizationRecommendation:
        return OptimizationRecommendation(
            category="query_structure",
            severity="medium",
            title="Consider Alternatives to DISTINCT",
            description="DISTINCT requires sorting/hashing all rows which can be expensive.",
            current_issue="DISTINCT used - may indicate data quality or join issues",
            recommendation="Review if DISTINCT is necessary or if joins produce duplicates",
            expected_improvement="Eliminates expensive deduplication step if not needed",
            implementation="Review join conditions, or use GROUP BY if aggregating anyway",
            priority=3
        )

    def _recommend_add_limit(self) -> OptimizationRecommendation:
        return OptimizationRecommendation(
            category="query_structure",
            severity="low",
            title="Add LIMIT to ORDER BY",
            description="ORDER BY without LIMIT sorts entire result set unnecessarily.",
            current_issue="ORDER BY present without LIMIT clause",
            recommendation="Add LIMIT if only top N rows are needed",
            expected_improvement="Significant reduction in sorting overhead for large results",
            implementation="Add LIMIT 100 (or appropriate number) after ORDER BY",
            priority=4
        )

    def _recommend_cte_conversion(self) -> OptimizationRecommendation:
        return OptimizationRecommendation(
            category="query_structure",
            severity="medium",
            title="Convert Subqueries to CTEs",
            description="Subqueries can be harder to optimize and maintain than CTEs.",
            current_issue="Subqueries detected in the query",
            recommendation="Convert correlated subqueries to CTEs or JOINs",
            expected_improvement="Better query plan optimization and readability",
            implementation="WITH subquery_name AS (SELECT ...) SELECT ... FROM main_table JOIN subquery_name",
            priority=3
        )

    def _analyze_index_opportunities(self, query_info: SQLQueryInfo) -> List[OptimizationRecommendation]:
        """Identify potential index opportunities"""
        recommendations = []

        # Columns in WHERE clause are index candidates
        where_columns = set()
        for condition in query_info.where_conditions:
            # Extract column names from conditions
            col_pattern = re.compile(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\s*(?:=|>|<|>=|<=|<>|!=|LIKE|IN|BETWEEN)', re.IGNORECASE)
            where_columns.update(col_pattern.findall(condition))

        if where_columns:
            recommendations.append(OptimizationRecommendation(
                category="index",
                severity="medium",
                title="Consider Indexes on Filter Columns",
                description="Columns used in WHERE clauses benefit from indexes.",
                current_issue=f"Filter columns detected: {', '.join(where_columns)}",
                recommendation="Create indexes on frequently filtered columns",
                expected_improvement="Orders of magnitude faster for selective queries",
                implementation=f"CREATE INDEX idx_name ON table ({', '.join(list(where_columns)[:3])})",
                priority=2
            ))

        # JOIN columns are index candidates
        join_columns = set()
        for join in query_info.joins:
            if join.get('condition'):
                col_pattern = re.compile(r'\.([a-zA-Z_][a-zA-Z0-9_]*)\s*=', re.IGNORECASE)
                join_columns.update(col_pattern.findall(join['condition']))

        if join_columns:
            recommendations.append(OptimizationRecommendation(
                category="index",
                severity="high",
                title="Index Join Columns",
                description="Join columns without indexes cause expensive full table scans.",
                current_issue=f"Join columns detected: {', '.join(join_columns)}",
                recommendation="Ensure indexes exist on join key columns",
                expected_improvement="Dramatic improvement in join performance",
                implementation=f"CREATE INDEX idx_join ON table ({list(join_columns)[0]})",
                priority=1
            ))

        return recommendations

    def _analyze_partition_pruning(self, query_info: SQLQueryInfo, sql: str) -> List[OptimizationRecommendation]:
        """Check for partition pruning opportunities"""
        recommendations = []

        # Look for date/time columns in WHERE clause
        date_pattern = re.compile(
            r'\b(date|time|timestamp|created|updated|modified)_?\w*\s*(?:=|>|<|>=|<=|BETWEEN)',
            re.IGNORECASE
        )

        if date_pattern.search(sql):
            recommendations.append(OptimizationRecommendation(
                category="partition",
                severity="medium",
                title="Leverage Partition Pruning",
                description="Date-based filters can leverage partitioned tables for massive speedups.",
                current_issue="Date/time filter detected - ensure table is partitioned",
                recommendation="Partition table by date column and ensure filter format matches",
                expected_improvement="90%+ reduction in data scanned for time-bounded queries",
                implementation="CREATE TABLE ... PARTITION BY RANGE (date_column) or use dynamic partitioning",
                priority=2
            ))

        return recommendations

    def _analyze_aggregation(self, query_info: SQLQueryInfo) -> List[OptimizationRecommendation]:
        """Analyze aggregation patterns"""
        recommendations = []

        # High cardinality GROUP BY warning
        if len(query_info.group_by) > 3:
            recommendations.append(OptimizationRecommendation(
                category="aggregation",
                severity="medium",
                title="High Cardinality GROUP BY",
                description="Grouping by many columns increases memory usage and reduces aggregation benefit.",
                current_issue=f"GROUP BY with {len(query_info.group_by)} columns detected",
                recommendation="Review if all group by columns are necessary",
                expected_improvement="Reduced memory and faster aggregation",
                implementation="Remove non-essential GROUP BY columns or pre-aggregate",
                priority=3
            ))

        # COUNT DISTINCT optimization
        if 'COUNT' in query_info.aggregations and query_info.distinct:
            recommendations.append(OptimizationRecommendation(
                category="aggregation",
                severity="medium",
                title="Optimize COUNT DISTINCT",
                description="COUNT DISTINCT can be expensive for high cardinality columns.",
                current_issue="COUNT DISTINCT pattern detected",
                recommendation="Consider HyperLogLog approximation for very large datasets",
                expected_improvement="Massive speedup with ~2% error tolerance",
                implementation="Use APPROX_COUNT_DISTINCT() if available in your warehouse",
                priority=3
            ))

        return recommendations


# =============================================================================
# Spark Job Analyzer
# =============================================================================

class SparkJobAnalyzer:
    """Analyze Spark job metrics and provide optimization recommendations"""

    def analyze(self, metrics: SparkJobMetrics) -> List[OptimizationRecommendation]:
        """Analyze Spark job metrics"""
        recommendations = []

        # Check for data skew
        if metrics.skew_ratio > 5:
            recommendations.append(self._recommend_skew_mitigation(metrics))

        # Check for excessive shuffle
        shuffle_ratio = metrics.shuffle_write_bytes / max(metrics.input_bytes, 1)
        if shuffle_ratio > 1.5:
            recommendations.append(self._recommend_reduce_shuffle(metrics, shuffle_ratio))

        # Check for GC overhead
        gc_ratio = metrics.gc_time_ms / max(metrics.duration_ms, 1)
        if gc_ratio > 0.1:
            recommendations.append(self._recommend_memory_tuning(metrics, gc_ratio))

        # Check for failed tasks
        if metrics.failed_tasks > 0:
            fail_ratio = metrics.failed_tasks / max(metrics.tasks, 1)
            recommendations.append(self._recommend_failure_handling(metrics, fail_ratio))

        # Check for speculative execution overhead
        if metrics.speculative_tasks > metrics.tasks * 0.1:
            recommendations.append(self._recommend_reduce_speculation(metrics))

        # Check task count
        if metrics.tasks > 10000:
            recommendations.append(self._recommend_reduce_tasks(metrics))
        elif metrics.tasks < 10 and metrics.input_bytes > 1e9:
            recommendations.append(self._recommend_increase_parallelism(metrics))

        return recommendations

    def _recommend_skew_mitigation(self, metrics: SparkJobMetrics) -> OptimizationRecommendation:
        return OptimizationRecommendation(
            category="skew",
            severity="critical",
            title="Severe Data Skew Detected",
            description=f"Skew ratio of {metrics.skew_ratio:.1f}x indicates uneven data distribution.",
            current_issue=f"Task execution time varies by {metrics.skew_ratio:.1f}x, causing stragglers",
            recommendation="Apply skew handling techniques to rebalance data",
            expected_improvement="Up to 80% reduction in job time by eliminating stragglers",
            implementation="""Options:
1. Salting: Add random prefix to skewed keys
   df.withColumn("salted_key", concat(col("key"), lit("_"), (rand() * 10).cast("int")))
2. Broadcast join for small tables:
   df1.join(broadcast(df2), "key")
3. Adaptive Query Execution (Spark 3.0+):
   spark.conf.set("spark.sql.adaptive.enabled", "true")
   spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")""",
            priority=1
        )

    def _recommend_reduce_shuffle(self, metrics: SparkJobMetrics, ratio: float) -> OptimizationRecommendation:
        return OptimizationRecommendation(
            category="shuffle",
            severity="high",
            title="Excessive Shuffle Data",
            description=f"Shuffle writes {ratio:.1f}x the input data size.",
            current_issue=f"Shuffle write: {metrics.shuffle_write_bytes / 1e9:.2f} GB vs input: {metrics.input_bytes / 1e9:.2f} GB",
            recommendation="Reduce shuffle through partitioning and early aggregation",
            expected_improvement="Significant network I/O and storage reduction",
            implementation="""Options:
1. Pre-aggregate before shuffle:
   df.groupBy("key").agg(sum("value")).repartition("key")
2. Use map-side combining:
   df.reduceByKey((a, b) => a + b)
3. Optimize partition count:
   spark.conf.set("spark.sql.shuffle.partitions", optimal_count)
4. Use bucketing for repeated joins:
   df.write.bucketBy(200, "key").saveAsTable("bucketed_table")""",
            priority=1
        )

    def _recommend_memory_tuning(self, metrics: SparkJobMetrics, gc_ratio: float) -> OptimizationRecommendation:
        return OptimizationRecommendation(
            category="memory",
            severity="high",
            title="High GC Overhead",
            description=f"GC time is {gc_ratio * 100:.1f}% of total execution time.",
            current_issue=f"GC time: {metrics.gc_time_ms / 1000:.1f}s out of {metrics.duration_ms / 1000:.1f}s total",
            recommendation="Tune memory settings to reduce garbage collection",
            expected_improvement="20-50% faster execution with proper memory config",
            implementation="""Memory tuning options:
1. Increase executor memory:
   --executor-memory 8g
2. Adjust memory fractions:
   spark.memory.fraction=0.6
   spark.memory.storageFraction=0.5
3. Use off-heap memory:
   spark.memory.offHeap.enabled=true
   spark.memory.offHeap.size=4g
4. Reduce cached data:
   df.unpersist() when no longer needed
5. Use Kryo serialization:
   spark.serializer=org.apache.spark.serializer.KryoSerializer""",
            priority=2
        )

    def _recommend_failure_handling(self, metrics: SparkJobMetrics, fail_ratio: float) -> OptimizationRecommendation:
        return OptimizationRecommendation(
            category="reliability",
            severity="high" if fail_ratio > 0.1 else "medium",
            title="Task Failures Detected",
            description=f"{metrics.failed_tasks} tasks failed ({fail_ratio * 100:.1f}% failure rate).",
            current_issue="Task failures increase job time and resource usage due to retries",
            recommendation="Investigate failure causes and add resilience",
            expected_improvement="Reduced retries and more predictable job times",
            implementation="""Failure handling options:
1. Check executor logs for OOM:
   spark.executor.memoryOverhead=2g
2. Handle data issues:
   df.filter(col("value").isNotNull())
3. Increase task retries:
   spark.task.maxFailures=4
4. Add checkpointing for long jobs:
   df.checkpoint()
5. Check for network timeouts:
   spark.network.timeout=300s""",
            priority=1
        )

    def _recommend_reduce_speculation(self, metrics: SparkJobMetrics) -> OptimizationRecommendation:
        return OptimizationRecommendation(
            category="execution",
            severity="medium",
            title="High Speculative Execution",
            description=f"{metrics.speculative_tasks} speculative tasks launched.",
            current_issue="Excessive speculation wastes resources and indicates underlying issues",
            recommendation="Address root cause of slow tasks instead of speculation",
            expected_improvement="Better resource utilization",
            implementation="""Options:
1. Disable speculation if not needed:
   spark.speculation=false
2. Or tune speculation settings:
   spark.speculation.multiplier=1.5
   spark.speculation.quantile=0.9
3. Fix underlying skew/memory issues first""",
            priority=3
        )

    def _recommend_reduce_tasks(self, metrics: SparkJobMetrics) -> OptimizationRecommendation:
        return OptimizationRecommendation(
            category="parallelism",
            severity="medium",
            title="Too Many Tasks",
            description=f"{metrics.tasks} tasks may cause excessive scheduling overhead.",
            current_issue="Very high task count increases driver overhead",
            recommendation="Reduce partition count for better efficiency",
            expected_improvement="Reduced scheduling overhead and driver memory usage",
            implementation=f"""
1. Reduce shuffle partitions:
   spark.sql.shuffle.partitions={max(200, metrics.tasks // 10)}
2. Coalesce partitions:
   df.coalesce({max(200, metrics.tasks // 10)})
3. Use adaptive partitioning (Spark 3.0+):
   spark.sql.adaptive.enabled=true""",
            priority=3
        )

    def _recommend_increase_parallelism(self, metrics: SparkJobMetrics) -> OptimizationRecommendation:
        recommended_partitions = max(200, int(metrics.input_bytes / (128 * 1e6)))  # 128MB per partition
        return OptimizationRecommendation(
            category="parallelism",
            severity="high",
            title="Low Parallelism",
            description=f"Only {metrics.tasks} tasks for {metrics.input_bytes / 1e9:.2f} GB of data.",
            current_issue="Under-utilization of cluster resources",
            recommendation="Increase parallelism to better utilize cluster",
            expected_improvement="Linear speedup with added parallelism",
            implementation=f"""
1. Increase shuffle partitions:
   spark.sql.shuffle.partitions={recommended_partitions}
2. Repartition input:
   df.repartition({recommended_partitions})
3. Adjust default parallelism:
   spark.default.parallelism={recommended_partitions}""",
            priority=2
        )


# =============================================================================
# Partition Strategy Advisor
# =============================================================================

class PartitionAdvisor:
    """Recommend partitioning strategies based on data characteristics"""

    def recommend(self, data_stats: Dict) -> List[PartitionStrategy]:
        """Generate partition recommendations from data statistics"""
        recommendations = []

        columns = data_stats.get('columns', {})
        total_size_bytes = data_stats.get('total_size_bytes', 0)
        row_count = data_stats.get('row_count', 0)

        for col_name, col_stats in columns.items():
            strategy = self._evaluate_column(col_name, col_stats, total_size_bytes, row_count)
            if strategy:
                recommendations.append(strategy)

        # Sort by partition effectiveness
        recommendations.sort(key=lambda s: s.partition_size_mb)

        return recommendations[:3]  # Top 3 recommendations

    def _evaluate_column(self, col_name: str, col_stats: Dict,
                        total_size_bytes: int, row_count: int) -> Optional[PartitionStrategy]:
        """Evaluate a column for partitioning potential"""
        cardinality = col_stats.get('cardinality', 0)
        data_type = col_stats.get('data_type', 'string')
        null_percentage = col_stats.get('null_percentage', 0)

        # Skip high-null columns
        if null_percentage > 20:
            return None

        # Date/timestamp columns are ideal for range partitioning
        if data_type in ('date', 'timestamp', 'datetime'):
            return self._recommend_date_partition(col_name, col_stats, total_size_bytes, row_count)

        # Low cardinality columns are good for list partitioning
        if cardinality and cardinality <= 100:
            return self._recommend_list_partition(col_name, col_stats, total_size_bytes, cardinality)

        # Medium cardinality columns can use hash partitioning
        if cardinality and 100 < cardinality <= 10000:
            return self._recommend_hash_partition(col_name, col_stats, total_size_bytes)

        return None

    def _recommend_date_partition(self, col_name: str, col_stats: Dict,
                                  total_size_bytes: int, row_count: int) -> PartitionStrategy:
        # Estimate daily partition size (assume 365 days of data)
        estimated_days = 365
        partition_size_mb = (total_size_bytes / estimated_days) / (1024 * 1024)

        return PartitionStrategy(
            column=col_name,
            partition_type="range",
            num_partitions=None,  # Dynamic based on date range
            partition_size_mb=partition_size_mb,
            reasoning=f"Date column '{col_name}' is ideal for range partitioning. "
                     f"Estimated daily partition size: {partition_size_mb:.1f} MB",
            implementation=f"""
-- BigQuery
CREATE TABLE table_name
PARTITION BY DATE({col_name})
AS SELECT * FROM source_table;

-- Snowflake
CREATE TABLE table_name
CLUSTER BY (DATE_TRUNC('DAY', {col_name}));

-- Spark/Hive
df.write.partitionBy("{col_name}").parquet("path")

-- PostgreSQL
CREATE TABLE table_name (...)
PARTITION BY RANGE ({col_name});"""
        )

    def _recommend_list_partition(self, col_name: str, col_stats: Dict,
                                  total_size_bytes: int, cardinality: int) -> PartitionStrategy:
        partition_size_mb = (total_size_bytes / cardinality) / (1024 * 1024)

        return PartitionStrategy(
            column=col_name,
            partition_type="list",
            num_partitions=cardinality,
            partition_size_mb=partition_size_mb,
            reasoning=f"Column '{col_name}' has {cardinality} distinct values - ideal for list partitioning. "
                     f"Estimated partition size: {partition_size_mb:.1f} MB",
            implementation=f"""
-- Spark/Hive
df.write.partitionBy("{col_name}").parquet("path")

-- PostgreSQL
CREATE TABLE table_name (...)
PARTITION BY LIST ({col_name});

-- Note: List partitioning works best with stable, low-cardinality values"""
        )

    def _recommend_hash_partition(self, col_name: str, col_stats: Dict,
                                  total_size_bytes: int) -> PartitionStrategy:
        # Target ~128MB partitions
        target_partition_size = 128 * 1024 * 1024
        num_partitions = max(1, int(total_size_bytes / target_partition_size))

        # Round to power of 2 for better distribution
        num_partitions = 2 ** int(math.log2(num_partitions) + 0.5)
        partition_size_mb = (total_size_bytes / num_partitions) / (1024 * 1024)

        return PartitionStrategy(
            column=col_name,
            partition_type="hash",
            num_partitions=num_partitions,
            partition_size_mb=partition_size_mb,
            reasoning=f"Column '{col_name}' has medium cardinality - hash partitioning provides even distribution. "
                     f"Recommended {num_partitions} partitions (~{partition_size_mb:.1f} MB each)",
            implementation=f"""
-- Spark
df.repartition({num_partitions}, col("{col_name}"))

-- PostgreSQL
CREATE TABLE table_name (...)
PARTITION BY HASH ({col_name});

-- Snowflake (clustering)
ALTER TABLE table_name CLUSTER BY ({col_name});"""
        )


# =============================================================================
# Cost Estimator
# =============================================================================

class CostEstimator:
    """Estimate query costs for cloud data warehouses"""

    # Pricing (approximate, varies by region and contract)
    PRICING = {
        'snowflake': {
            'compute_per_credit': 2.00,  # USD per credit
            'credits_per_hour': {
                'x-small': 1,
                'small': 2,
                'medium': 4,
                'large': 8,
                'x-large': 16,
            },
            'storage_per_tb_month': 23.00,
        },
        'bigquery': {
            'on_demand_per_tb': 5.00,  # USD per TB scanned
            'storage_per_tb_month': 20.00,
            'streaming_insert_per_gb': 0.01,
        },
        'redshift': {
            'dc2_large_per_hour': 0.25,
            'ra3_xlarge_per_hour': 1.086,
            'storage_per_gb_month': 0.024,
        },
        'databricks': {
            'dbu_per_hour_sql': 0.22,
            'dbu_per_hour_jobs': 0.15,
        }
    }

    def estimate(self, query_info: SQLQueryInfo, warehouse: str,
                data_stats: Optional[Dict] = None) -> CostEstimate:
        """Estimate query cost"""
        warehouse = warehouse.lower()

        if warehouse not in self.PRICING:
            raise ValueError(f"Unknown warehouse: {warehouse}. Supported: {list(self.PRICING.keys())}")

        # Estimate data scanned
        data_scanned_bytes = self._estimate_data_scanned(query_info, data_stats)
        data_scanned_tb = data_scanned_bytes / (1024 ** 4)

        if warehouse == 'bigquery':
            return self._estimate_bigquery(query_info, data_scanned_tb, data_stats)
        elif warehouse == 'snowflake':
            return self._estimate_snowflake(query_info, data_scanned_tb, data_stats)
        elif warehouse == 'redshift':
            return self._estimate_redshift(query_info, data_scanned_tb, data_stats)
        elif warehouse == 'databricks':
            return self._estimate_databricks(query_info, data_scanned_tb, data_stats)

    def _estimate_data_scanned(self, query_info: SQLQueryInfo,
                               data_stats: Optional[Dict]) -> int:
        """Estimate bytes of data that will be scanned"""
        if data_stats and 'total_size_bytes' in data_stats:
            base_size = data_stats['total_size_bytes']
        else:
            # Default assumption: 1GB per table
            base_size = len(query_info.tables) * 1e9

        # Adjust for filters
        filter_factor = 1.0
        if query_info.where_conditions:
            # Assume each filter reduces data by 50% (very rough)
            filter_factor = 0.5 ** min(len(query_info.where_conditions), 3)

        # Adjust for column projection
        if '*' not in query_info.columns and query_info.columns:
            # Assume selecting specific columns reduces scan by 50%
            filter_factor *= 0.5

        return int(base_size * filter_factor)

    def _estimate_bigquery(self, query_info: SQLQueryInfo,
                          data_scanned_tb: float, data_stats: Optional[Dict]) -> CostEstimate:
        pricing = self.PRICING['bigquery']

        compute_cost = data_scanned_tb * pricing['on_demand_per_tb']

        # Minimum billing of 10MB
        if data_scanned_tb < 10 / (1024 ** 2):
            compute_cost = 10 / (1024 ** 2) * pricing['on_demand_per_tb']

        return CostEstimate(
            warehouse='BigQuery',
            compute_cost=compute_cost,
            storage_cost=0,  # Storage cost separate
            data_transfer_cost=0,
            total_cost=compute_cost,
            assumptions=[
                f"Estimated {data_scanned_tb * 1024:.2f} GB data scanned",
                "Using on-demand pricing ($5/TB)",
                "Assumes no slot reservations",
                "Actual cost depends on partitioning and clustering"
            ]
        )

    def _estimate_snowflake(self, query_info: SQLQueryInfo,
                           data_scanned_tb: float, data_stats: Optional[Dict]) -> CostEstimate:
        pricing = self.PRICING['snowflake']

        # Estimate warehouse size and time
        complexity_to_size = {
            'low': 'x-small',
            'medium': 'small',
            'high': 'medium',
            'very_high': 'large'
        }
        warehouse_size = complexity_to_size.get(query_info.estimated_complexity, 'small')
        credits_per_hour = pricing['credits_per_hour'][warehouse_size]

        # Estimate runtime (very rough)
        estimated_seconds = max(1, data_scanned_tb * 1024 * 10)  # 10 seconds per GB
        estimated_hours = estimated_seconds / 3600

        credits_used = credits_per_hour * estimated_hours
        compute_cost = credits_used * pricing['compute_per_credit']

        # Minimum 1 minute billing
        min_cost = (credits_per_hour / 60) * pricing['compute_per_credit']
        compute_cost = max(compute_cost, min_cost)

        return CostEstimate(
            warehouse='Snowflake',
            compute_cost=compute_cost,
            storage_cost=0,
            data_transfer_cost=0,
            total_cost=compute_cost,
            assumptions=[
                f"Warehouse size: {warehouse_size}",
                f"Estimated runtime: {estimated_seconds:.1f} seconds",
                f"Credits used: {credits_used:.4f}",
                "Minimum 1-minute billing applies",
                "Actual cost depends on warehouse auto-suspend settings"
            ]
        )

    def _estimate_redshift(self, query_info: SQLQueryInfo,
                          data_scanned_tb: float, data_stats: Optional[Dict]) -> CostEstimate:
        pricing = self.PRICING['redshift']

        # Assume RA3 xl node type
        hourly_rate = pricing['ra3_xlarge_per_hour']

        # Estimate runtime
        estimated_seconds = max(1, data_scanned_tb * 1024 * 15)  # 15 seconds per GB
        estimated_hours = estimated_seconds / 3600

        compute_cost = hourly_rate * estimated_hours

        return CostEstimate(
            warehouse='Redshift',
            compute_cost=compute_cost,
            storage_cost=0,
            data_transfer_cost=0,
            total_cost=compute_cost,
            assumptions=[
                f"Using RA3.xlplus node type",
                f"Estimated runtime: {estimated_seconds:.1f} seconds",
                "Assumes dedicated cluster (not serverless)",
                "Actual cost depends on cluster configuration"
            ]
        )

    def _estimate_databricks(self, query_info: SQLQueryInfo,
                            data_scanned_tb: float, data_stats: Optional[Dict]) -> CostEstimate:
        pricing = self.PRICING['databricks']

        # Estimate DBUs
        estimated_seconds = max(1, data_scanned_tb * 1024 * 12)
        estimated_hours = estimated_seconds / 3600

        dbu_cost = pricing['dbu_per_hour_sql'] * estimated_hours

        return CostEstimate(
            warehouse='Databricks',
            compute_cost=dbu_cost,
            storage_cost=0,
            data_transfer_cost=0,
            total_cost=dbu_cost,
            assumptions=[
                f"Using SQL warehouse",
                f"Estimated runtime: {estimated_seconds:.1f} seconds",
                "DBU rate may vary by workspace tier",
                "Does not include underlying cloud costs"
            ]
        )


# =============================================================================
# Report Generator
# =============================================================================

class ReportGenerator:
    """Generate optimization reports"""

    def generate_text_report(self, query_info: SQLQueryInfo,
                            recommendations: List[OptimizationRecommendation],
                            cost_estimate: Optional[CostEstimate] = None) -> str:
        """Generate a text report"""
        lines = []
        lines.append("=" * 80)
        lines.append("ETL PERFORMANCE OPTIMIZATION REPORT")
        lines.append("=" * 80)
        lines.append(f"\nGenerated: {datetime.now().isoformat()}")

        # Query summary
        lines.append("\n" + "-" * 40)
        lines.append("QUERY ANALYSIS")
        lines.append("-" * 40)
        lines.append(f"Query Type: {query_info.query_type}")
        lines.append(f"Tables: {', '.join(query_info.tables) or 'None'}")
        lines.append(f"Joins: {len(query_info.joins)}")
        lines.append(f"Subqueries: {query_info.subqueries}")
        lines.append(f"Aggregations: {', '.join(query_info.aggregations) or 'None'}")
        lines.append(f"Window Functions: {', '.join(query_info.window_functions) or 'None'}")
        lines.append(f"Complexity: {query_info.estimated_complexity.upper()}")

        # Cost estimate
        if cost_estimate:
            lines.append("\n" + "-" * 40)
            lines.append("COST ESTIMATE")
            lines.append("-" * 40)
            lines.append(f"Warehouse: {cost_estimate.warehouse}")
            lines.append(f"Estimated Cost: ${cost_estimate.total_cost:.4f} {cost_estimate.currency}")
            lines.append("Assumptions:")
            for assumption in cost_estimate.assumptions:
                lines.append(f"  - {assumption}")

        # Recommendations
        if recommendations:
            lines.append("\n" + "-" * 40)
            lines.append(f"OPTIMIZATION RECOMMENDATIONS ({len(recommendations)} found)")
            lines.append("-" * 40)

            for i, rec in enumerate(recommendations, 1):
                severity_icon = {
                    'critical': '🔴',
                    'high': '🟠',
                    'medium': '🟡',
                    'low': '🟢'
                }.get(rec.severity, '⚪')

                lines.append(f"\n{i}. {severity_icon} [{rec.severity.upper()}] {rec.title}")
                lines.append(f"   Category: {rec.category}")
                lines.append(f"   Issue: {rec.current_issue}")
                lines.append(f"   Recommendation: {rec.recommendation}")
                lines.append(f"   Expected Improvement: {rec.expected_improvement}")
                lines.append(f"\n   Implementation:")
                for impl_line in rec.implementation.strip().split('\n'):
                    lines.append(f"   {impl_line}")
        else:
            lines.append("\n✅ No optimization issues detected")

        lines.append("\n" + "=" * 80)

        return "\n".join(lines)

    def generate_json_report(self, query_info: SQLQueryInfo,
                            recommendations: List[OptimizationRecommendation],
                            cost_estimate: Optional[CostEstimate] = None) -> Dict:
        """Generate a JSON report"""
        return {
            "report_type": "etl_performance_optimization",
            "generated_at": datetime.now().isoformat(),
            "query_analysis": {
                "query_type": query_info.query_type,
                "tables": query_info.tables,
                "joins": query_info.joins,
                "subqueries": query_info.subqueries,
                "aggregations": query_info.aggregations,
                "window_functions": query_info.window_functions,
                "complexity": query_info.estimated_complexity
            },
            "cost_estimate": asdict(cost_estimate) if cost_estimate else None,
            "recommendations": [asdict(r) for r in recommendations],
            "summary": {
                "total_recommendations": len(recommendations),
                "critical": sum(1 for r in recommendations if r.severity == "critical"),
                "high": sum(1 for r in recommendations if r.severity == "high"),
                "medium": sum(1 for r in recommendations if r.severity == "medium"),
                "low": sum(1 for r in recommendations if r.severity == "low")
            }
        }


# =============================================================================
# CLI Commands
# =============================================================================

def cmd_analyze_sql(args):
    """Analyze SQL query for optimization opportunities"""
    # Load SQL
    sql_path = Path(args.input)
    if sql_path.exists():
        with open(sql_path, 'r') as f:
            sql = f.read()
    else:
        sql = args.input  # Treat as inline SQL

    # Parse and analyze
    parser = SQLParser()
    query_info = parser.parse(sql)

    optimizer = SQLOptimizer()
    recommendations = optimizer.analyze(query_info, sql)

    # Cost estimate if warehouse specified
    cost_estimate = None
    if args.warehouse:
        estimator = CostEstimator()
        data_stats = None
        if args.stats:
            with open(args.stats, 'r') as f:
                data_stats = json.load(f)
        cost_estimate = estimator.estimate(query_info, args.warehouse, data_stats)

    # Generate report
    reporter = ReportGenerator()

    if args.json:
        report = reporter.generate_json_report(query_info, recommendations, cost_estimate)
        output = json.dumps(report, indent=2)
    else:
        output = reporter.generate_text_report(query_info, recommendations, cost_estimate)

    if args.output:
        with open(args.output, 'w') as f:
            f.write(output)
        logger.info(f"Report saved to {args.output}")
    else:
        print(output)


def cmd_analyze_spark(args):
    """Analyze Spark job metrics"""
    with open(args.input, 'r') as f:
        metrics_data = json.load(f)

    # Handle both single job and array of jobs
    if isinstance(metrics_data, list):
        jobs = metrics_data
    else:
        jobs = [metrics_data]

    all_recommendations = []
    analyzer = SparkJobAnalyzer()

    for job_data in jobs:
        metrics = SparkJobMetrics(
            job_id=job_data.get('jobId', 'unknown'),
            duration_ms=job_data.get('duration', 0),
            stages=job_data.get('numStages', 0),
            tasks=job_data.get('numTasks', 0),
            shuffle_read_bytes=job_data.get('shuffleReadBytes', 0),
            shuffle_write_bytes=job_data.get('shuffleWriteBytes', 0),
            input_bytes=job_data.get('inputBytes', 0),
            output_bytes=job_data.get('outputBytes', 0),
            peak_memory_bytes=job_data.get('peakMemoryBytes', 0),
            gc_time_ms=job_data.get('gcTime', 0),
            failed_tasks=job_data.get('failedTasks', 0),
            speculative_tasks=job_data.get('speculativeTasks', 0),
            skew_ratio=job_data.get('skewRatio', 1.0)
        )

        recommendations = analyzer.analyze(metrics)
        all_recommendations.extend(recommendations)

    # Deduplicate similar recommendations
    unique_recs = []
    seen_titles = set()
    for rec in all_recommendations:
        if rec.title not in seen_titles:
            unique_recs.append(rec)
            seen_titles.add(rec.title)

    # Output
    if args.json:
        output = json.dumps([asdict(r) for r in unique_recs], indent=2)
    else:
        lines = []
        lines.append("=" * 60)
        lines.append("SPARK JOB OPTIMIZATION REPORT")
        lines.append("=" * 60)
        lines.append(f"\nJobs Analyzed: {len(jobs)}")
        lines.append(f"Recommendations: {len(unique_recs)}")

        for i, rec in enumerate(unique_recs, 1):
            lines.append(f"\n{i}. [{rec.severity.upper()}] {rec.title}")
            lines.append(f"   {rec.description}")
            lines.append(f"   Implementation: {rec.implementation[:200]}...")

        output = "\n".join(lines)

    if args.output:
        with open(args.output, 'w') as f:
            f.write(output)
    else:
        print(output)


def cmd_optimize_partition(args):
    """Recommend partition strategies"""
    with open(args.input, 'r') as f:
        data_stats = json.load(f)

    advisor = PartitionAdvisor()
    strategies = advisor.recommend(data_stats)

    if args.json:
        output = json.dumps([asdict(s) for s in strategies], indent=2)
    else:
        lines = []
        lines.append("=" * 60)
        lines.append("PARTITION STRATEGY RECOMMENDATIONS")
        lines.append("=" * 60)

        if not strategies:
            lines.append("\nNo partition recommendations based on provided data statistics.")
        else:
            for i, strategy in enumerate(strategies, 1):
                lines.append(f"\n{i}. Partition by: {strategy.column}")
                lines.append(f"   Type: {strategy.partition_type}")
                if strategy.num_partitions:
                    lines.append(f"   Partitions: {strategy.num_partitions}")
                lines.append(f"   Estimated size: {strategy.partition_size_mb:.1f} MB per partition")
                lines.append(f"   Reasoning: {strategy.reasoning}")
                lines.append(f"\n   Implementation:")
                for impl_line in strategy.implementation.strip().split('\n'):
                    lines.append(f"   {impl_line}")

        output = "\n".join(lines)

    if args.output:
        with open(args.output, 'w') as f:
            f.write(output)
    else:
        print(output)


def cmd_estimate_cost(args):
    """Estimate query cost"""
    # Load SQL
    sql_path = Path(args.input)
    if sql_path.exists():
        with open(sql_path, 'r') as f:
            sql = f.read()
    else:
        sql = args.input

    # Parse
    parser = SQLParser()
    query_info = parser.parse(sql)

    # Load data stats if provided
    data_stats = None
    if args.stats:
        with open(args.stats, 'r') as f:
            data_stats = json.load(f)

    # Estimate cost
    estimator = CostEstimator()
    cost = estimator.estimate(query_info, args.warehouse, data_stats)

    if args.json:
        output = json.dumps(asdict(cost), indent=2)
    else:
        lines = []
        lines.append(f"Cost Estimate for {cost.warehouse}")
        lines.append("=" * 40)
        lines.append(f"Compute Cost:      ${cost.compute_cost:.4f}")
        lines.append(f"Storage Cost:      ${cost.storage_cost:.4f}")
        lines.append(f"Data Transfer:     ${cost.data_transfer_cost:.4f}")
        lines.append("-" * 40)
        lines.append(f"Total:             ${cost.total_cost:.4f} {cost.currency}")
        lines.append("\nAssumptions:")
        for assumption in cost.assumptions:
            lines.append(f"  - {assumption}")

        output = "\n".join(lines)

    if args.output:
        with open(args.output, 'w') as f:
            f.write(output)
    else:
        print(output)


def cmd_generate_template(args):
    """Generate template files"""
    templates = {
        'data_stats': {
            "total_size_bytes": 10737418240,
            "row_count": 10000000,
            "columns": {
                "id": {
                    "data_type": "integer",
                    "cardinality": 10000000,
                    "null_percentage": 0
                },
                "created_at": {
                    "data_type": "timestamp",
                    "cardinality": 1000000,
                    "null_percentage": 0
                },
                "category": {
                    "data_type": "string",
                    "cardinality": 50,
                    "null_percentage": 2
                },
                "amount": {
                    "data_type": "float",
                    "cardinality": 100000,
                    "null_percentage": 5
                }
            }
        },
        'spark_metrics': {
            "jobId": "job_12345",
            "duration": 300000,
            "numStages": 5,
            "numTasks": 200,
            "shuffleReadBytes": 5368709120,
            "shuffleWriteBytes": 2147483648,
            "inputBytes": 10737418240,
            "outputBytes": 1073741824,
            "peakMemoryBytes": 4294967296,
            "gcTime": 15000,
            "failedTasks": 2,
            "speculativeTasks": 5,
            "skewRatio": 3.5
        }
    }

    if args.template not in templates:
        logger.error(f"Unknown template: {args.template}. Available: {list(templates.keys())}")
        sys.exit(1)

    output = json.dumps(templates[args.template], indent=2)

    if args.output:
        with open(args.output, 'w') as f:
            f.write(output)
        logger.info(f"Template saved to {args.output}")
    else:
        print(output)


def main():
    """Main entry point"""
    parser = argparse.ArgumentParser(
        description="ETL Performance Optimizer - Analyze and optimize data pipelines",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Analyze SQL query
  python etl_performance_optimizer.py analyze-sql query.sql

  # Analyze with cost estimate
  python etl_performance_optimizer.py analyze-sql query.sql --warehouse bigquery

  # Analyze Spark job metrics
  python etl_performance_optimizer.py analyze-spark spark-history.json

  # Get partition recommendations
  python etl_performance_optimizer.py optimize-partition data_stats.json

  # Estimate query cost
  python etl_performance_optimizer.py estimate-cost query.sql --warehouse snowflake

  # Generate template files
  python etl_performance_optimizer.py template data_stats --output stats.json
        """
    )

    parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')

    subparsers = parser.add_subparsers(dest='command', help='Command to run')

    # Analyze SQL command
    sql_parser = subparsers.add_parser('analyze-sql', help='Analyze SQL query')
    sql_parser.add_argument('input', help='SQL file or inline query')
    sql_parser.add_argument('--warehouse', '-w', choices=['bigquery', 'snowflake', 'redshift', 'databricks'],
                          help='Warehouse for cost estimation')
    sql_parser.add_argument('--stats', '-s', help='Data statistics JSON file')
    sql_parser.add_argument('--output', '-o', help='Output file')
    sql_parser.add_argument('--json', action='store_true', help='Output as JSON')
    sql_parser.set_defaults(func=cmd_analyze_sql)

    # Analyze Spark command
    spark_parser = subparsers.add_parser('analyze-spark', help='Analyze Spark job metrics')
    spark_parser.add_argument('input', help='Spark metrics JSON file')
    spark_parser.add_argument('--output', '-o', help='Output file')
    spark_parser.add_argument('--json', action='store_true', help='Output as JSON')
    spark_parser.set_defaults(func=cmd_analyze_spark)

    # Optimize partition command
    partition_parser = subparsers.add_parser('optimize-partition', help='Recommend partition strategies')
    partition_parser.add_argument('input', help='Data statistics JSON file')
    partition_parser.add_argument('--output', '-o', help='Output file')
    partition_parser.add_argument('--json', action='store_true', help='Output as JSON')
    partition_parser.set_defaults(func=cmd_optimize_partition)

    # Estimate cost command
    cost_parser = subparsers.add_parser('estimate-cost', help='Estimate query cost')
    cost_parser.add_argument('input', help='SQL file or inline query')
    cost_parser.add_argument('--warehouse', '-w', required=True,
                           choices=['bigquery', 'snowflake', 'redshift', 'databricks'],
                           help='Target warehouse')
    cost_parser.add_argument('--stats', '-s', help='Data statistics JSON file')
    cost_parser.add_argument('--output', '-o', help='Output file')
    cost_parser.add_argument('--json', action='store_true', help='Output as JSON')
    cost_parser.set_defaults(func=cmd_estimate_cost)

    # Template command
    template_parser = subparsers.add_parser('template', help='Generate template files')
    template_parser.add_argument('template', choices=['data_stats', 'spark_metrics'],
                                help='Template type')
    template_parser.add_argument('--output', '-o', help='Output file')
    template_parser.set_defaults(func=cmd_generate_template)

    args = parser.parse_args()

    if args.verbose:
        logging.getLogger().setLevel(logging.DEBUG)

    if not args.command:
        parser.print_help()
        sys.exit(1)

    try:
        args.func(args)
    except Exception as e:
        logger.error(f"Error: {e}")
        if args.verbose:
            import traceback
            traceback.print_exc()
        sys.exit(1)


if __name__ == '__main__':
    main()

```

### references/data_pipeline_architecture.md

```markdown
# Data Pipeline Architecture

Comprehensive guide to designing and implementing production data pipelines.

## Table of Contents

1. [Architecture Patterns](#architecture-patterns)
2. [Batch Processing](#batch-processing)
3. [Stream Processing](#stream-processing)
4. [Exactly-Once Semantics](#exactly-once-semantics)
5. [Error Handling](#error-handling)
6. [Data Ingestion Patterns](#data-ingestion-patterns)
7. [Orchestration](#orchestration)

---

## Architecture Patterns

### Lambda Architecture

The Lambda architecture combines batch and stream processing for comprehensive data handling.

```
                    ┌─────────────────────────────────────┐
                    │           Data Sources              │
                    └─────────────────┬───────────────────┘
                                      │
                    ┌─────────────────▼───────────────────┐
                    │         Message Queue (Kafka)        │
                    └───────┬─────────────────┬───────────┘
                            │                 │
              ┌─────────────▼─────┐   ┌───────▼─────────────┐
              │    Batch Layer    │   │    Speed Layer      │
              │   (Spark/Airflow) │   │   (Flink/Spark SS)  │
              └─────────────┬─────┘   └───────┬─────────────┘
                            │                 │
              ┌─────────────▼─────┐   ┌───────▼─────────────┐
              │   Master Dataset  │   │   Real-time Views   │
              │   (Data Lake)     │   │   (Redis/Druid)     │
              └─────────────┬─────┘   └───────┬─────────────┘
                            │                 │
                    ┌───────▼─────────────────▼───────┐
                    │          Serving Layer           │
                    │   (Merged Batch + Real-time)     │
                    └─────────────────────────────────┘
```

**Components:**

1. **Batch Layer**
   - Processes complete historical data
   - Creates precomputed batch views
   - Handles complex transformations, ML training
   - Reprocessable from raw data

2. **Speed Layer**
   - Processes real-time data stream
   - Creates real-time views for recent data
   - Low latency, simpler transformations
   - Compensates for batch layer delay

3. **Serving Layer**
   - Merges batch and real-time views
   - Responds to queries
   - Provides unified interface

**Implementation Example:**

```python
# Batch layer: Daily aggregation with Spark
def batch_daily_aggregation(spark, date):
    """Process full day of data for batch views."""
    raw_df = spark.read.parquet(f"s3://data-lake/raw/events/date={date}")

    aggregated = raw_df.groupBy("user_id", "event_type") \
        .agg(
            count("*").alias("event_count"),
            sum("revenue").alias("total_revenue"),
            max("timestamp").alias("last_event")
        )

    aggregated.write \
        .mode("overwrite") \
        .partitionBy("event_type") \
        .parquet(f"s3://data-lake/batch-views/daily_agg/date={date}")

# Speed layer: Real-time aggregation with Spark Structured Streaming
def speed_realtime_aggregation(spark):
    """Process streaming data for real-time views."""
    stream_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("subscribe", "events") \
        .load()

    parsed = stream_df.select(
        from_json(col("value").cast("string"), event_schema).alias("data")
    ).select("data.*")

    aggregated = parsed \
        .withWatermark("timestamp", "5 minutes") \
        .groupBy(
            window("timestamp", "1 minute"),
            "user_id",
            "event_type"
        ) \
        .agg(count("*").alias("event_count"))

    query = aggregated.writeStream \
        .format("redis") \
        .option("host", "redis") \
        .outputMode("update") \
        .start()

    return query
```

### Kappa Architecture

Kappa simplifies Lambda by using only stream processing with replay capability.

```
                    ┌─────────────────────────────────────┐
                    │           Data Sources              │
                    └─────────────────┬───────────────────┘
                                      │
                    ┌─────────────────▼───────────────────┐
                    │    Immutable Log (Kafka/Kinesis)     │
                    │         (Long retention)             │
                    └─────────────────┬───────────────────┘
                                      │
                    ┌─────────────────▼───────────────────┐
                    │        Stream Processor              │
                    │      (Flink/Spark Streaming)         │
                    └─────────────────┬───────────────────┘
                                      │
                    ┌─────────────────▼───────────────────┐
                    │         Serving Layer                │
                    │    (Database/Data Warehouse)         │
                    └─────────────────────────────────────┘
```

**Key Principles:**

1. **Single Processing Path**: All data processed as streams
2. **Immutable Log**: Kafka/Kinesis as source of truth with long retention
3. **Reprocessing via Replay**: Re-run stream processor from beginning when needed

**Reprocessing Strategy:**

```python
# Reprocessing in Kappa architecture
class KappaReprocessor:
    """Handle reprocessing by replaying from Kafka."""

    def __init__(self, kafka_config, flink_job):
        self.kafka = kafka_config
        self.job = flink_job

    def reprocess(self, from_timestamp: str):
        """Reprocess all data from a specific timestamp."""

        # 1. Start new consumer group reading from timestamp
        new_consumer_group = f"reprocess-{uuid.uuid4()}"

        # 2. Configure stream processor with new group
        self.job.set_config({
            "group.id": new_consumer_group,
            "auto.offset.reset": "none"  # We'll set offset manually
        })

        # 3. Seek to timestamp
        offsets = self._get_offsets_for_timestamp(from_timestamp)
        self.job.seek_to_offsets(offsets)

        # 4. Write to new output table/topic
        output_table = f"events_reprocessed_{datetime.now().strftime('%Y%m%d')}"
        self.job.set_output(output_table)

        # 5. Run until caught up
        self.job.run_until_caught_up()

        # 6. Swap output tables atomically
        self._atomic_table_swap("events", output_table)

    def _get_offsets_for_timestamp(self, timestamp):
        """Get Kafka offsets for a specific timestamp."""
        consumer = KafkaConsumer(bootstrap_servers=self.kafka["brokers"])
        partitions = consumer.partitions_for_topic("events")

        offsets = {}
        for partition in partitions:
            tp = TopicPartition("events", partition)
            offset = consumer.offsets_for_times({tp: timestamp})
            offsets[tp] = offset[tp].offset

        return offsets
```

### Medallion Architecture (Bronze/Silver/Gold)

Common in data lakehouses (Databricks, Delta Lake).

```
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Bronze    │────▶│   Silver    │────▶│    Gold     │
│  (Raw Data) │     │ (Cleansed)  │     │ (Analytics) │
└─────────────┘     └─────────────┘     └─────────────┘
     │                    │                    │
     ▼                    ▼                    ▼
 Landing zone        Validated,          Aggregated,
 Append-only         deduplicated,       business-ready
 Schema evolution    standardized        Star schema
```

**Implementation with Delta Lake:**

```python
# Bronze: Raw ingestion
def ingest_to_bronze(spark, source_path, bronze_path):
    """Ingest raw data to bronze layer."""
    df = spark.read.format("json").load(source_path)

    # Add metadata
    df = df.withColumn("_ingested_at", current_timestamp()) \
           .withColumn("_source_file", input_file_name())

    df.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .save(bronze_path)

# Silver: Cleansing and validation
def bronze_to_silver(spark, bronze_path, silver_path):
    """Transform bronze to silver with cleansing."""
    bronze_df = spark.read.format("delta").load(bronze_path)

    # Read last processed version
    last_version = get_last_processed_version(silver_path, "bronze")

    # Get only new records
    new_records = bronze_df.filter(col("_commit_version") > last_version)

    # Cleanse and validate
    silver_df = new_records \
        .filter(col("user_id").isNotNull()) \
        .filter(col("event_type").isin(["click", "view", "purchase"])) \
        .withColumn("event_date", to_date("timestamp")) \
        .dropDuplicates(["event_id"])

    # Merge to silver (upsert)
    silver_table = DeltaTable.forPath(spark, silver_path)

    silver_table.alias("target") \
        .merge(
            silver_df.alias("source"),
            "target.event_id = source.event_id"
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

# Gold: Business aggregations
def silver_to_gold(spark, silver_path, gold_path):
    """Create business-ready aggregations in gold layer."""
    silver_df = spark.read.format("delta").load(silver_path)

    # Daily user metrics
    daily_metrics = silver_df \
        .groupBy("user_id", "event_date") \
        .agg(
            count("*").alias("total_events"),
            countDistinct("session_id").alias("sessions"),
            sum(when(col("event_type") == "purchase", col("revenue")).otherwise(0)).alias("revenue"),
            max("timestamp").alias("last_activity")
        )

    # Write as gold table
    daily_metrics.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("event_date") \
        .save(gold_path + "/daily_user_metrics")
```

---

## Batch Processing

### Apache Spark Best Practices

#### Memory Management

```python
# Optimal Spark configuration for batch jobs
spark = SparkSession.builder \
    .appName("BatchETL") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()
```

**Memory Tuning Guidelines:**

| Data Size | Executors | Memory/Executor | Cores/Executor |
|-----------|-----------|-----------------|----------------|
| < 10 GB   | 2-4       | 4-8 GB          | 2-4            |
| 10-100 GB | 10-20     | 8-16 GB         | 4-8            |
| 100+ GB   | 50+       | 16-32 GB        | 4-8            |

#### Partition Optimization

```python
# Repartition vs Coalesce
# Repartition: Full shuffle, use for increasing partitions
df_repartitioned = df.repartition(100, "date")  # Partition by column

# Coalesce: No shuffle, use for decreasing partitions
df_coalesced = df.coalesce(10)  # Reduce partitions without shuffle

# Optimal partition size: 128-256 MB each
# Calculate partitions:
# num_partitions = total_data_size_mb / 200

# Check current partitions
print(f"Current partitions: {df.rdd.getNumPartitions()}")

# Repartition for optimal join performance
large_df = large_df.repartition(200, "join_key")
small_df = small_df.repartition(200, "join_key")
result = large_df.join(small_df, "join_key")
```

#### Join Optimization

```python
# Broadcast join for small tables (< 10MB by default)
from pyspark.sql.functions import broadcast

# Explicit broadcast hint
result = large_df.join(broadcast(small_df), "key")

# Increase broadcast threshold if needed
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m")

# Sort-merge join for large tables
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

# Bucket tables for frequent joins
df.write \
    .bucketBy(100, "customer_id") \
    .sortBy("customer_id") \
    .mode("overwrite") \
    .saveAsTable("bucketed_orders")
```

#### Caching Strategy

```python
# Cache when:
# 1. DataFrame is used multiple times
# 2. After expensive transformations
# 3. Before iterative operations

# Use MEMORY_AND_DISK for large datasets
from pyspark import StorageLevel

df.persist(StorageLevel.MEMORY_AND_DISK)

# Cache only necessary columns
df.select("id", "value").cache()

# Unpersist when done
df.unpersist()

# Check storage
spark.catalog.clearCache()  # Clear all caches
```

### Airflow DAG Patterns

#### Idempotent Tasks

```python
# Always design idempotent tasks
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import timedelta

@dag(
    schedule_interval="@daily",
    start_date=days_ago(7),
    catchup=True,
    default_args={
        "retries": 3,
        "retry_delay": timedelta(minutes=5),
    }
)
def idempotent_etl():

    @task
    def extract(execution_date=None):
        """Idempotent extraction - same date always returns same data."""
        date_str = execution_date.strftime("%Y-%m-%d")

        # Query for specific date only
        query = f"""
            SELECT * FROM source_table
            WHERE DATE(created_at) = '{date_str}'
        """
        return query_database(query)

    @task
    def transform(data):
        """Pure function - no side effects."""
        return [transform_record(r) for r in data]

    @task
    def load(data, execution_date=None):
        """Idempotent load - delete before insert or use MERGE."""
        date_str = execution_date.strftime("%Y-%m-%d")

        # Option 1: Delete and reinsert
        execute_sql(f"DELETE FROM target WHERE date = '{date_str}'")
        insert_data(data)

        # Option 2: Use MERGE/UPSERT
        # MERGE INTO target USING source ON target.id = source.id
        # WHEN MATCHED THEN UPDATE
        # WHEN NOT MATCHED THEN INSERT

    raw = extract()
    transformed = transform(raw)
    load(transformed)

dag = idempotent_etl()
```

#### Backfill Pattern

```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta

def process_date(ds, **kwargs):
    """Process a single date - supports backfill."""
    logical_date = datetime.strptime(ds, "%Y-%m-%d")

    # Always process specific date, not "latest"
    data = extract_for_date(logical_date)
    transformed = transform(data)

    # Use partition/date-specific target
    load_to_partition(transformed, partition=ds)

with DAG(
    "backfillable_etl",
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=True,  # Enable backfill
    max_active_runs=3,  # Limit parallel backfills
) as dag:

    process = PythonOperator(
        task_id="process",
        python_callable=process_date,
        provide_context=True,
    )

# Backfill command:
# airflow dags backfill -s 2024-01-01 -e 2024-01-31 backfillable_etl
```

---

## Stream Processing

### Apache Kafka Architecture

#### Topic Design

```bash
# Create topic with proper configuration
kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --topic user-events \
    --partitions 24 \
    --replication-factor 3 \
    --config retention.ms=604800000 \        # 7 days
    --config retention.bytes=107374182400 \  # 100GB
    --config cleanup.policy=delete \
    --config min.insync.replicas=2 \         # Durability
    --config segment.bytes=1073741824        # 1GB segments
```

**Partition Count Guidelines:**

| Throughput | Partitions | Notes |
|------------|------------|-------|
| < 10K msg/s | 6-12 | Single consumer can handle |
| 10K-100K msg/s | 24-48 | Multiple consumers needed |
| > 100K msg/s | 100+ | Scale consumers with partitions |

**Partition Key Selection:**

```python
# Good partition keys: Even distribution, related data together
# For user events: user_id (events for same user on same partition)
# For orders: order_id (if no ordering needed) or customer_id (if needed)

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8')
)

def send_event(event):
    # Use user_id as key for user-based partitioning
    producer.send(
        topic='user-events',
        key=event['user_id'],  # Partition key
        value=event
    )
```

### Spark Structured Streaming

#### Watermarks and Late Data

```python
from pyspark.sql.functions import window, col

# Read stream
events = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load() \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

# Add watermark for late data handling
# Data arriving more than 10 minutes late will be dropped
windowed_counts = events \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "5 minutes", "1 minute"),  # 5-min windows, 1-min slide
        "event_type"
    ) \
    .count()

# Write with append mode (only final results for complete windows)
query = windowed_counts.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/windowed_counts") \
    .start()
```

**Watermark Behavior:**

```
Timeline: ─────────────────────────────────────────▶
Events:   E1   E2   E3        E4(late)    E5
          │    │    │         │           │
Time:     10:00 10:02 10:05   10:03       10:15
          ▲                   ▲
          │                   │
          Current            Arrives at 10:15
          watermark          but event_time=10:03
          = max_event_time
            - threshold
          = 10:05 - 10min    If watermark > event_time:
          = 9:55               Event is dropped (too late)
```

#### Stateful Operations

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.streaming.state import GroupState, GroupStateTimeout

# Session windows using flatMapGroupsWithState
def session_aggregation(key, events, state):
    """Aggregate events into sessions with 30-minute timeout."""

    # Get or initialize state
    if state.exists:
        session = state.get
    else:
        session = {"start": None, "events": [], "total": 0}

    # Process new events
    for event in events:
        if session["start"] is None:
            session["start"] = event.timestamp
        session["events"].append(event)
        session["total"] += event.value

    # Set timeout (session expires after 30 min of inactivity)
    state.setTimeoutDuration("30 minutes")

    # Check if session should close
    if state.hasTimedOut():
        # Emit completed session
        output = {
            "user_id": key,
            "session_start": session["start"],
            "event_count": len(session["events"]),
            "total_value": session["total"]
        }
        state.remove()
        yield output
    else:
        # Update state
        state.update(session)

# Apply stateful operation
sessions = events \
    .groupByKey(lambda e: e.user_id) \
    .flatMapGroupsWithState(
        session_aggregation,
        outputMode="append",
        stateTimeout=GroupStateTimeout.ProcessingTimeTimeout()
    )
```

---

## Exactly-Once Semantics

### Producer Idempotence

```python
from kafka import KafkaProducer

# Enable idempotent producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    acks='all',                    # Wait for all replicas
    enable_idempotence=True,       # Exactly-once per partition
    max_in_flight_requests_per_connection=5,  # Max with idempotence
    retries=2147483647,            # Infinite retries
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Producer will deduplicate based on sequence numbers
for i in range(100):
    producer.send('topic', {'id': i, 'data': 'value'})

producer.flush()
```

### Transactional Processing

```python
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError

# Transactional producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    transactional_id='my-transactional-id',  # Enable transactions
    enable_idempotence=True,
    acks='all'
)

producer.init_transactions()

def process_with_transactions(consumer, producer):
    """Read-process-write with exactly-once semantics."""

    try:
        producer.begin_transaction()

        # Read
        records = consumer.poll(timeout_ms=1000)

        for tp, messages in records.items():
            for message in messages:
                # Process
                result = transform(message.value)

                # Write to output topic
                producer.send('output-topic', result)

        # Commit offsets and transaction atomically
        producer.send_offsets_to_transaction(
            consumer.position(consumer.assignment()),
            consumer.group_id
        )
        producer.commit_transaction()

    except KafkaError as e:
        producer.abort_transaction()
        raise
```

### Spark Exactly-Once to External Systems

```python
# Use foreachBatch with idempotent writes
def write_to_database_idempotent(batch_df, batch_id):
    """Write batch with exactly-once semantics."""

    # Add batch_id for deduplication
    batch_with_id = batch_df.withColumn("batch_id", lit(batch_id))

    # Use MERGE for idempotent writes
    batch_with_id.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost/db") \
        .option("dbtable", "staging_events") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()

    # Merge staging to final (idempotent)
    execute_sql("""
        MERGE INTO events AS target
        USING staging_events AS source
        ON target.event_id = source.event_id
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)

    # Clean staging
    execute_sql("TRUNCATE staging_events")

query = events.writeStream \
    .foreachBatch(write_to_database_idempotent) \
    .option("checkpointLocation", "/checkpoints/to-postgres") \
    .start()
```

---

## Error Handling

### Dead Letter Queue (DLQ)

```python
class DeadLetterQueue:
    """Handle failed records with dead letter queue pattern."""

    def __init__(self, dlq_topic: str, producer: KafkaProducer):
        self.dlq_topic = dlq_topic
        self.producer = producer

    def send_to_dlq(self, record, error: Exception, context: dict):
        """Send failed record to DLQ with error metadata."""

        dlq_record = {
            "original_record": record,
            "error_type": type(error).__name__,
            "error_message": str(error),
            "timestamp": datetime.utcnow().isoformat(),
            "context": context,
            "retry_count": context.get("retry_count", 0)
        }

        self.producer.send(
            self.dlq_topic,
            value=json.dumps(dlq_record).encode('utf-8')
        )

def process_with_dlq(consumer, processor, dlq):
    """Process records with DLQ for failures."""

    for message in consumer:
        try:
            result = processor.process(message.value)
            # Success - commit offset
            consumer.commit()

        except ValidationError as e:
            # Non-retryable - send to DLQ immediately
            dlq.send_to_dlq(
                message.value,
                e,
                {"topic": message.topic, "partition": message.partition}
            )
            consumer.commit()  # Don't retry

        except TemporaryError as e:
            # Retryable - don't commit, let consumer retry
            # After max retries, send to DLQ
            retry_count = message.headers.get("retry_count", 0)
            if retry_count >= MAX_RETRIES:
                dlq.send_to_dlq(message.value, e, {"retry_count": retry_count})
                consumer.commit()
            else:
                raise  # Will be retried
```

### Circuit Breaker

```python
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
import threading

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject calls
    HALF_OPEN = "half_open"  # Testing if recovered

@dataclass
class CircuitBreaker:
    """Circuit breaker for external service calls."""

    failure_threshold: int = 5
    recovery_timeout: timedelta = timedelta(seconds=30)
    success_threshold: int = 3

    def __post_init__(self):
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
        self.lock = threading.Lock()

    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker protection."""

        with self.lock:
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise CircuitOpenError("Circuit is open")

        try:
            result = func(*args, **kwargs)
            self._record_success()
            return result

        except Exception as e:
            self._record_failure()
            raise

    def _record_success(self):
        with self.lock:
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                if self.success_count >= self.success_threshold:
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0
                    self.success_count = 0
            elif self.state == CircuitState.CLOSED:
                self.failure_count = 0

    def _record_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = datetime.now()

            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.OPEN
                self.success_count = 0
            elif self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

    def _should_attempt_reset(self):
        if self.last_failure_time is None:
            return True
        return datetime.now() - self.last_failure_time >= self.recovery_timeout

# Usage
circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=timedelta(seconds=60))

def call_external_api(data):
    return circuit.call(external_api.process, data)
```

---

## Data Ingestion Patterns

### Change Data Capture (CDC)

```python
# Using Debezium with Kafka Connect
# connector-config.json
{
    "name": "postgres-cdc-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "password",
        "database.dbname": "source_db",
        "database.server.name": "source",
        "table.include.list": "public.orders,public.customers",
        "plugin.name": "pgoutput",
        "publication.name": "dbz_publication",
        "slot.name": "debezium_slot",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"
    }
}
```

**Processing CDC Events:**

```python
def process_cdc_event(event):
    """Process Debezium CDC event."""

    operation = event.get("op")

    if operation == "c":  # Create (INSERT)
        after = event.get("after")
        return {"action": "insert", "data": after}

    elif operation == "u":  # Update
        before = event.get("before")
        after = event.get("after")
        return {"action": "update", "before": before, "after": after}

    elif operation == "d":  # Delete
        before = event.get("before")
        return {"action": "delete", "data": before}

    elif operation == "r":  # Read (snapshot)
        after = event.get("after")
        return {"action": "snapshot", "data": after}
```

### Bulk Ingestion

```python
# Efficient bulk loading to data warehouse
from concurrent.futures import ThreadPoolExecutor
import boto3

class BulkIngester:
    """Bulk ingest data to Snowflake via S3."""

    def __init__(self, s3_bucket: str, snowflake_conn):
        self.s3 = boto3.client('s3')
        self.bucket = s3_bucket
        self.snowflake = snowflake_conn

    def ingest_dataframe(self, df, table_name: str, mode: str = "append"):
        """Bulk ingest DataFrame to Snowflake."""

        # 1. Write to S3 as Parquet (compressed, columnar)
        s3_path = f"s3://{self.bucket}/staging/{table_name}/{uuid.uuid4()}"
        df.write.parquet(s3_path)

        # 2. Create external stage if not exists
        self.snowflake.execute(f"""
            CREATE STAGE IF NOT EXISTS {table_name}_stage
            URL = '{s3_path}'
            CREDENTIALS = (AWS_KEY_ID='...' AWS_SECRET_KEY='...')
            FILE_FORMAT = (TYPE = 'PARQUET')
        """)

        # 3. COPY INTO (much faster than INSERT)
        if mode == "overwrite":
            self.snowflake.execute(f"TRUNCATE TABLE {table_name}")

        self.snowflake.execute(f"""
            COPY INTO {table_name}
            FROM @{table_name}_stage
            FILE_FORMAT = (TYPE = 'PARQUET')
            MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
            ON_ERROR = 'CONTINUE'
        """)

        # 4. Cleanup staging files
        self._cleanup_s3(s3_path)
```

---

## Orchestration

### Dependency Management

```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
from datetime import timedelta

with DAG("complex_pipeline") as dag:

    # Wait for upstream DAG
    wait_for_source = ExternalTaskSensor(
        task_id="wait_for_source_etl",
        external_dag_id="source_etl_dag",
        external_task_id="final_task",
        execution_delta=timedelta(hours=0),
        timeout=3600,
        mode="poke",
        poke_interval=60,
    )

    # Parallel extraction group
    with TaskGroup("extract") as extract_group:
        extract_orders = PythonOperator(
            task_id="extract_orders",
            python_callable=extract_orders_func,
        )
        extract_customers = PythonOperator(
            task_id="extract_customers",
            python_callable=extract_customers_func,
        )
        extract_products = PythonOperator(
            task_id="extract_products",
            python_callable=extract_products_func,
        )

    # Sequential transformation
    with TaskGroup("transform") as transform_group:
        join_data = PythonOperator(
            task_id="join_data",
            python_callable=join_func,
        )
        aggregate = PythonOperator(
            task_id="aggregate",
            python_callable=aggregate_func,
        )
        join_data >> aggregate

    # Load
    load = PythonOperator(
        task_id="load",
        python_callable=load_func,
    )

    # Define dependencies
    wait_for_source >> extract_group >> transform_group >> load
```

### Dynamic DAG Generation

```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import yaml

def create_etl_dag(config: dict) -> DAG:
    """Factory function to create ETL DAGs from config."""

    dag = DAG(
        dag_id=f"etl_{config['source']}_{config['destination']}",
        schedule_interval=config.get('schedule', '@daily'),
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['etl', 'auto-generated'],
    )

    with dag:
        extract = PythonOperator(
            task_id='extract',
            python_callable=create_extract_func(config['source']),
        )

        transform = PythonOperator(
            task_id='transform',
            python_callable=create_transform_func(config['transformations']),
        )

        load = PythonOperator(
            task_id='load',
            python_callable=create_load_func(config['destination']),
        )

        extract >> transform >> load

    return dag

# Load configurations
with open('/config/etl_pipelines.yaml') as f:
    configs = yaml.safe_load(f)

# Generate DAGs
for config in configs['pipelines']:
    dag_id = f"etl_{config['source']}_{config['destination']}"
    globals()[dag_id] = create_etl_dag(config)
```

```

### references/data_modeling_patterns.md

```markdown
# Data Modeling Patterns

Comprehensive guide to data modeling for analytics and data warehousing.

## Table of Contents

1. [Dimensional Modeling](#dimensional-modeling)
2. [Slowly Changing Dimensions](#slowly-changing-dimensions)
3. [Data Vault Modeling](#data-vault-modeling)
4. [dbt Best Practices](#dbt-best-practices)
5. [Partitioning and Clustering](#partitioning-and-clustering)
6. [Schema Evolution](#schema-evolution)

---

## Dimensional Modeling

### Star Schema

The most common pattern for analytical data models. One fact table surrounded by dimension tables.

```
                    ┌─────────────┐
                    │ dim_product │
                    └──────┬──────┘
                           │
┌─────────────┐    ┌───────▼───────┐    ┌─────────────┐
│ dim_customer│◄───│   fct_sales   │───►│  dim_date   │
└─────────────┘    └───────┬───────┘    └─────────────┘
                           │
                    ┌──────▼──────┐
                    │  dim_store  │
                    └─────────────┘
```

**Fact Table (fct_sales):**

```sql
CREATE TABLE fct_sales (
    sale_id BIGINT PRIMARY KEY,

    -- Foreign keys to dimensions
    customer_key INT REFERENCES dim_customer(customer_key),
    product_key INT REFERENCES dim_product(product_key),
    store_key INT REFERENCES dim_store(store_key),
    date_key INT REFERENCES dim_date(date_key),

    -- Degenerate dimension (no separate table)
    order_number VARCHAR(50),

    -- Measures (facts)
    quantity INT,
    unit_price DECIMAL(10,2),
    discount_amount DECIMAL(10,2),
    net_amount DECIMAL(10,2),
    tax_amount DECIMAL(10,2),
    total_amount DECIMAL(10,2),

    -- Audit columns
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Partition by date for query performance
ALTER TABLE fct_sales
PARTITION BY RANGE (date_key);
```

**Dimension Table (dim_customer):**

```sql
CREATE TABLE dim_customer (
    customer_key INT PRIMARY KEY,  -- Surrogate key
    customer_id VARCHAR(50),       -- Natural/business key

    -- Attributes
    first_name VARCHAR(100),
    last_name VARCHAR(100),
    email VARCHAR(255),
    phone VARCHAR(50),

    -- Hierarchies
    city VARCHAR(100),
    state VARCHAR(100),
    country VARCHAR(100),
    region VARCHAR(50),

    -- SCD tracking
    effective_date DATE,
    expiration_date DATE,
    is_current BOOLEAN,

    -- Audit
    created_at TIMESTAMP,
    updated_at TIMESTAMP
);
```

**Date Dimension:**

```sql
CREATE TABLE dim_date (
    date_key INT PRIMARY KEY,      -- YYYYMMDD format
    full_date DATE,

    -- Day attributes
    day_of_week INT,
    day_of_month INT,
    day_of_year INT,
    day_name VARCHAR(10),
    is_weekend BOOLEAN,
    is_holiday BOOLEAN,

    -- Week attributes
    week_of_year INT,
    week_start_date DATE,
    week_end_date DATE,

    -- Month attributes
    month_number INT,
    month_name VARCHAR(10),
    month_start_date DATE,
    month_end_date DATE,

    -- Quarter attributes
    quarter_number INT,
    quarter_name VARCHAR(10),

    -- Year attributes
    year_number INT,
    fiscal_year INT,
    fiscal_quarter INT,

    -- Relative flags
    is_current_day BOOLEAN,
    is_current_week BOOLEAN,
    is_current_month BOOLEAN,
    is_current_quarter BOOLEAN,
    is_current_year BOOLEAN
);

-- Generate date dimension
INSERT INTO dim_date
SELECT
    TO_CHAR(d, 'YYYYMMDD')::INT as date_key,
    d as full_date,
    EXTRACT(DOW FROM d) as day_of_week,
    EXTRACT(DAY FROM d) as day_of_month,
    EXTRACT(DOY FROM d) as day_of_year,
    TO_CHAR(d, 'Day') as day_name,
    EXTRACT(DOW FROM d) IN (0, 6) as is_weekend,
    FALSE as is_holiday,  -- Update from holiday calendar
    EXTRACT(WEEK FROM d) as week_of_year,
    DATE_TRUNC('week', d) as week_start_date,
    DATE_TRUNC('week', d) + INTERVAL '6 days' as week_end_date,
    EXTRACT(MONTH FROM d) as month_number,
    TO_CHAR(d, 'Month') as month_name,
    DATE_TRUNC('month', d) as month_start_date,
    (DATE_TRUNC('month', d) + INTERVAL '1 month' - INTERVAL '1 day')::DATE as month_end_date,
    EXTRACT(QUARTER FROM d) as quarter_number,
    'Q' || EXTRACT(QUARTER FROM d) as quarter_name,
    EXTRACT(YEAR FROM d) as year_number,
    -- Fiscal year (assuming July start)
    CASE WHEN EXTRACT(MONTH FROM d) >= 7 THEN EXTRACT(YEAR FROM d) + 1
         ELSE EXTRACT(YEAR FROM d) END as fiscal_year,
    CASE WHEN EXTRACT(MONTH FROM d) >= 7 THEN CEIL((EXTRACT(MONTH FROM d) - 6) / 3.0)
         ELSE CEIL((EXTRACT(MONTH FROM d) + 6) / 3.0) END as fiscal_quarter,
    d = CURRENT_DATE as is_current_day,
    d >= DATE_TRUNC('week', CURRENT_DATE) AND d < DATE_TRUNC('week', CURRENT_DATE) + INTERVAL '7 days' as is_current_week,
    DATE_TRUNC('month', d) = DATE_TRUNC('month', CURRENT_DATE) as is_current_month,
    DATE_TRUNC('quarter', d) = DATE_TRUNC('quarter', CURRENT_DATE) as is_current_quarter,
    EXTRACT(YEAR FROM d) = EXTRACT(YEAR FROM CURRENT_DATE) as is_current_year
FROM generate_series('2020-01-01'::DATE, '2030-12-31'::DATE, '1 day'::INTERVAL) d;
```

### Snowflake Schema

Normalized dimensions for reduced storage and update anomalies.

```
                         ┌─────────────┐
                         │ dim_category│
                         └──────┬──────┘
                                │
┌─────────────┐    ┌───────────▼────┐    ┌─────────────┐
│ dim_customer│◄───│   fct_sales    │───►│ dim_product │
└──────┬──────┘    └───────┬────────┘    └──────┬──────┘
       │                   │                    │
┌──────▼──────┐    ┌───────▼───────┐    ┌──────▼──────┐
│ dim_geography│   │   dim_date    │    │  dim_brand  │
└─────────────┘    └───────────────┘    └─────────────┘
```

**When to use Snowflake vs Star:**

| Criteria | Star Schema | Snowflake Schema |
|----------|-------------|------------------|
| Query complexity | Simple JOINs | More JOINs required |
| Query performance | Faster (fewer JOINs) | Slower |
| Storage | Higher (denormalized) | Lower (normalized) |
| ETL complexity | Higher | Lower |
| Dimension updates | Multiple places | Single place |
| Best for | BI/reporting | Storage-constrained |

### One Big Table (OBT)

Fully denormalized single table - gaining popularity with modern columnar warehouses.

```sql
CREATE TABLE obt_sales AS
SELECT
    -- Fact measures
    s.sale_id,
    s.quantity,
    s.unit_price,
    s.total_amount,

    -- Customer attributes (denormalized)
    c.customer_id,
    c.first_name,
    c.last_name,
    c.email,
    c.city,
    c.state,
    c.country,

    -- Product attributes (denormalized)
    p.product_id,
    p.product_name,
    p.category,
    p.subcategory,
    p.brand,

    -- Date attributes (denormalized)
    d.full_date as sale_date,
    d.year_number,
    d.quarter_number,
    d.month_name,
    d.week_of_year,
    d.is_weekend

FROM fct_sales s
JOIN dim_customer c ON s.customer_key = c.customer_key AND c.is_current
JOIN dim_product p ON s.product_key = p.product_key AND p.is_current
JOIN dim_date d ON s.date_key = d.date_key;
```

**OBT Tradeoffs:**

| Pros | Cons |
|------|------|
| Simple queries (no JOINs) | Storage bloat |
| Fast for analytics | Harder to maintain |
| Great with columnar storage | Stale data risk |
| Self-documenting | Update anomalies |

---

## Slowly Changing Dimensions

### Type 0: Fixed Dimension

No changes allowed - original value preserved forever.

```sql
-- Type 0: Never update these fields
CREATE TABLE dim_customer_type0 (
    customer_key INT PRIMARY KEY,
    customer_id VARCHAR(50),
    original_signup_date DATE,  -- Never changes
    original_source VARCHAR(50) -- Never changes
);
```

### Type 1: Overwrite

Simply overwrite old value with new. No history preserved.

```sql
-- Type 1: Update in place
UPDATE dim_customer
SET
    email = '[email protected]',
    updated_at = CURRENT_TIMESTAMP
WHERE customer_id = 'CUST001';

-- dbt implementation (Type 1)
-- models/dim_customer_type1.sql
{{
    config(
        materialized='table',
        unique_key='customer_id'
    )
}}

SELECT
    customer_id,
    first_name,
    last_name,
    email,  -- Current value only
    phone,
    address,
    CURRENT_TIMESTAMP as updated_at
FROM {{ source('raw', 'customers') }}
```

### Type 2: Add New Row

Create new record with new values. Full history preserved.

```sql
-- Type 2 dimension structure
CREATE TABLE dim_customer_scd2 (
    customer_key SERIAL PRIMARY KEY,    -- Surrogate key
    customer_id VARCHAR(50),            -- Natural key
    first_name VARCHAR(100),
    last_name VARCHAR(100),
    email VARCHAR(255),
    city VARCHAR(100),
    state VARCHAR(100),

    -- SCD2 tracking columns
    effective_start_date TIMESTAMP,
    effective_end_date TIMESTAMP,
    is_current BOOLEAN,

    -- Hash for change detection
    row_hash VARCHAR(64)
);

-- SCD2 merge logic
MERGE INTO dim_customer_scd2 AS target
USING (
    SELECT
        customer_id,
        first_name,
        last_name,
        email,
        city,
        state,
        MD5(CONCAT(first_name, last_name, email, city, state)) as row_hash
    FROM staging_customers
) AS source
ON target.customer_id = source.customer_id AND target.is_current = TRUE

-- Close existing record if changed
WHEN MATCHED AND target.row_hash != source.row_hash THEN
    UPDATE SET
        effective_end_date = CURRENT_TIMESTAMP,
        is_current = FALSE

-- Insert new record for changes
WHEN NOT MATCHED OR (MATCHED AND target.row_hash != source.row_hash) THEN
    INSERT (customer_id, first_name, last_name, email, city, state,
            effective_start_date, effective_end_date, is_current, row_hash)
    VALUES (source.customer_id, source.first_name, source.last_name, source.email,
            source.city, source.state, CURRENT_TIMESTAMP, '9999-12-31', TRUE, source.row_hash);
```

**dbt SCD2 Implementation:**

```sql
-- models/dim_customer_scd2.sql
{{
    config(
        materialized='incremental',
        unique_key='customer_key',
        strategy='check',
        check_cols=['first_name', 'last_name', 'email', 'city', 'state']
    )
}}

WITH source_data AS (
    SELECT
        customer_id,
        first_name,
        last_name,
        email,
        city,
        state,
        MD5(CONCAT_WS('|', first_name, last_name, email, city, state)) as row_hash,
        CURRENT_TIMESTAMP as extracted_at
    FROM {{ source('raw', 'customers') }}
),

{% if is_incremental() %}
-- Get current records that have changed
changed_records AS (
    SELECT
        s.*,
        t.customer_key as existing_key
    FROM source_data s
    LEFT JOIN {{ this }} t
        ON s.customer_id = t.customer_id
        AND t.is_current = TRUE
    WHERE t.customer_key IS NULL  -- New record
       OR t.row_hash != s.row_hash  -- Changed record
)
{% endif %}

SELECT
    {{ dbt_utils.generate_surrogate_key(['customer_id', 'extracted_at']) }} as customer_key,
    customer_id,
    first_name,
    last_name,
    email,
    city,
    state,
    extracted_at as effective_start_date,
    CAST('9999-12-31' AS TIMESTAMP) as effective_end_date,
    TRUE as is_current,
    row_hash
{% if is_incremental() %}
FROM changed_records
{% else %}
FROM source_data
{% endif %}
```

### Type 3: Add New Column

Add column for previous value. Limited history (usually just prior value).

```sql
-- Type 3: Previous value column
CREATE TABLE dim_customer_scd3 (
    customer_key INT PRIMARY KEY,
    customer_id VARCHAR(50),
    city VARCHAR(100),
    previous_city VARCHAR(100),  -- Previous value
    city_change_date DATE,
    state VARCHAR(100),
    previous_state VARCHAR(100),
    state_change_date DATE
);

-- Update Type 3
UPDATE dim_customer_scd3
SET
    previous_city = city,
    city = 'New York',
    city_change_date = CURRENT_DATE
WHERE customer_id = 'CUST001';
```

### Type 4: Mini-Dimension

Separate rapidly changing attributes into a mini-dimension.

```sql
-- Main customer dimension (slowly changing)
CREATE TABLE dim_customer (
    customer_key INT PRIMARY KEY,
    customer_id VARCHAR(50),
    first_name VARCHAR(100),
    last_name VARCHAR(100),
    email VARCHAR(255)
);

-- Mini-dimension for rapidly changing attributes
CREATE TABLE dim_customer_profile (
    profile_key INT PRIMARY KEY,
    age_band VARCHAR(20),      -- '18-24', '25-34', etc.
    income_band VARCHAR(20),   -- 'Low', 'Medium', 'High'
    loyalty_tier VARCHAR(20)   -- 'Bronze', 'Silver', 'Gold'
);

-- Fact table references both
CREATE TABLE fct_sales (
    sale_id BIGINT PRIMARY KEY,
    customer_key INT REFERENCES dim_customer,
    profile_key INT REFERENCES dim_customer_profile,  -- Current profile at time of sale
    ...
);
```

### Type 6: Hybrid (1 + 2 + 3)

Combines Types 1, 2, and 3 for maximum flexibility.

```sql
-- Type 6: Combined approach
CREATE TABLE dim_customer_scd6 (
    customer_key INT PRIMARY KEY,
    customer_id VARCHAR(50),

    -- Current values (Type 1 - always updated)
    current_city VARCHAR(100),
    current_state VARCHAR(100),

    -- Historical values (Type 2 - row versioned)
    historical_city VARCHAR(100),
    historical_state VARCHAR(100),

    -- Previous values (Type 3)
    previous_city VARCHAR(100),

    -- SCD2 tracking
    effective_start_date TIMESTAMP,
    effective_end_date TIMESTAMP,
    is_current BOOLEAN
);
```

---

## Data Vault Modeling

### Core Concepts

Data Vault provides:
- Full historization
- Parallel loading
- Flexibility for changing business rules
- Auditability

```
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│  Hub_Customer│◄───│Link_Customer│───►│  Hub_Order  │
│              │    │   _Order    │    │             │
└──────┬───────┘    └─────────────┘    └──────┬──────┘
       │                                      │
       ▼                                      ▼
┌─────────────┐                        ┌─────────────┐
│Sat_Customer │                        │  Sat_Order  │
│  _Details   │                        │  _Details   │
└─────────────┘                        └─────────────┘
```

### Hub Tables

Business keys and surrogate keys only.

```sql
-- Hub: Business entity identifier
CREATE TABLE hub_customer (
    hub_customer_key VARCHAR(64) PRIMARY KEY,  -- Hash of business key
    customer_id VARCHAR(50),                    -- Business key
    load_date TIMESTAMP,
    record_source VARCHAR(100)
);

-- Hub loading (idempotent insert)
INSERT INTO hub_customer (hub_customer_key, customer_id, load_date, record_source)
SELECT
    MD5(customer_id) as hub_customer_key,
    customer_id,
    CURRENT_TIMESTAMP as load_date,
    'SOURCE_CRM' as record_source
FROM staging_customers s
WHERE NOT EXISTS (
    SELECT 1 FROM hub_customer h
    WHERE h.customer_id = s.customer_id
);
```

### Satellite Tables

Descriptive attributes with full history.

```sql
-- Satellite: Attributes with history
CREATE TABLE sat_customer_details (
    hub_customer_key VARCHAR(64),
    load_date TIMESTAMP,
    load_end_date TIMESTAMP,

    -- Descriptive attributes
    first_name VARCHAR(100),
    last_name VARCHAR(100),
    email VARCHAR(255),
    phone VARCHAR(50),

    -- Change detection
    hash_diff VARCHAR(64),
    record_source VARCHAR(100),

    PRIMARY KEY (hub_customer_key, load_date),
    FOREIGN KEY (hub_customer_key) REFERENCES hub_customer
);

-- Satellite loading (delta detection)
INSERT INTO sat_customer_details
SELECT
    MD5(s.customer_id) as hub_customer_key,
    CURRENT_TIMESTAMP as load_date,
    NULL as load_end_date,
    s.first_name,
    s.last_name,
    s.email,
    s.phone,
    MD5(CONCAT_WS('|', s.first_name, s.last_name, s.email, s.phone)) as hash_diff,
    'SOURCE_CRM' as record_source
FROM staging_customers s
LEFT JOIN sat_customer_details sat
    ON MD5(s.customer_id) = sat.hub_customer_key
    AND sat.load_end_date IS NULL
WHERE sat.hub_customer_key IS NULL  -- New customer
   OR sat.hash_diff != MD5(CONCAT_WS('|', s.first_name, s.last_name, s.email, s.phone));  -- Changed

-- Close previous satellite records
UPDATE sat_customer_details
SET load_end_date = CURRENT_TIMESTAMP
WHERE hub_customer_key IN (
    SELECT MD5(customer_id) FROM staging_customers
)
AND load_end_date IS NULL
AND load_date < CURRENT_TIMESTAMP;
```

### Link Tables

Relationships between hubs.

```sql
-- Link: Relationship between entities
CREATE TABLE link_customer_order (
    link_customer_order_key VARCHAR(64) PRIMARY KEY,
    hub_customer_key VARCHAR(64),
    hub_order_key VARCHAR(64),
    load_date TIMESTAMP,
    record_source VARCHAR(100),

    FOREIGN KEY (hub_customer_key) REFERENCES hub_customer,
    FOREIGN KEY (hub_order_key) REFERENCES hub_order
);

-- Link loading
INSERT INTO link_customer_order
SELECT
    MD5(CONCAT(s.customer_id, '|', s.order_id)) as link_customer_order_key,
    MD5(s.customer_id) as hub_customer_key,
    MD5(s.order_id) as hub_order_key,
    CURRENT_TIMESTAMP as load_date,
    'SOURCE_ORDERS' as record_source
FROM staging_orders s
WHERE NOT EXISTS (
    SELECT 1 FROM link_customer_order l
    WHERE l.hub_customer_key = MD5(s.customer_id)
      AND l.hub_order_key = MD5(s.order_id)
);
```

---

## dbt Best Practices

### Model Organization

```
models/
├── staging/           # 1:1 with source tables
│   ├── stg_orders.sql
│   ├── stg_customers.sql
│   └── _staging.yml
├── intermediate/      # Business logic transformations
│   ├── int_orders_enriched.sql
│   └── _intermediate.yml
└── marts/             # Business-facing models
    ├── core/
    │   ├── dim_customers.sql
    │   ├── fct_orders.sql
    │   └── _core.yml
    └── marketing/
        ├── mrt_customer_segments.sql
        └── _marketing.yml
```

### Staging Models

```sql
-- models/staging/stg_orders.sql
{{
    config(
        materialized='view'
    )
}}

WITH source AS (
    SELECT * FROM {{ source('ecommerce', 'orders') }}
),

renamed AS (
    SELECT
        -- Primary key
        id as order_id,

        -- Foreign keys
        customer_id,
        product_id,

        -- Timestamps
        created_at as order_created_at,
        updated_at as order_updated_at,

        -- Measures
        quantity,
        CAST(unit_price AS DECIMAL(10,2)) as unit_price,
        CAST(discount AS DECIMAL(5,2)) as discount_percent,

        -- Status
        UPPER(status) as order_status

    FROM source
)

SELECT * FROM renamed
```

### Intermediate Models

```sql
-- models/intermediate/int_orders_enriched.sql
{{
    config(
        materialized='ephemeral'  -- Not persisted, just CTE
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
),

products AS (
    SELECT * FROM {{ ref('stg_products') }}
),

enriched AS (
    SELECT
        o.order_id,
        o.order_created_at,
        o.order_status,

        -- Customer info
        c.customer_id,
        c.customer_name,
        c.customer_segment,

        -- Product info
        p.product_id,
        p.product_name,
        p.category,

        -- Calculated fields
        o.quantity,
        o.unit_price,
        o.quantity * o.unit_price as gross_amount,
        o.quantity * o.unit_price * (1 - COALESCE(o.discount_percent, 0) / 100) as net_amount

    FROM orders o
    LEFT JOIN customers c ON o.customer_id = c.customer_id
    LEFT JOIN products p ON o.product_id = p.product_id
)

SELECT * FROM enriched
```

### Incremental Models

```sql
-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        incremental_strategy='merge',
        on_schema_change='sync_all_columns',
        cluster_by=['order_date']
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('int_orders_enriched') }}

    {% if is_incremental() %}
    -- Only process new/changed records
    WHERE order_updated_at > (
        SELECT COALESCE(MAX(order_updated_at), '1900-01-01')
        FROM {{ this }}
    )
    {% endif %}
),

final AS (
    SELECT
        order_id,
        customer_id,
        product_id,
        DATE(order_created_at) as order_date,
        order_created_at,
        order_updated_at,
        order_status,
        quantity,
        unit_price,
        gross_amount,
        net_amount,
        CURRENT_TIMESTAMP as _loaded_at
    FROM orders
)

SELECT * FROM final
```

### Testing

```yaml
# models/marts/_core.yml
version: 2

models:
  - name: fct_orders
    description: "Order fact table"
    columns:
      - name: order_id
        tests:
          - unique
          - not_null

      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id

      - name: net_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              inclusive: true

      - name: order_date
        tests:
          - not_null
          - dbt_utils.recency:
              datepart: day
              field: order_date
              interval: 1
```

### Macros

```sql
-- macros/generate_surrogate_key.sql
{% macro generate_surrogate_key(columns) %}
    {{ dbt_utils.generate_surrogate_key(columns) }}
{% endmacro %}

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
    ROUND({{ column_name }} / 100.0, 2)
{% endmacro %}

-- macros/safe_divide.sql
{% macro safe_divide(numerator, denominator, default=0) %}
    CASE
        WHEN {{ denominator }} = 0 OR {{ denominator }} IS NULL THEN {{ default }}
        ELSE {{ numerator }} / {{ denominator }}
    END
{% endmacro %}

-- Usage in models:
-- {{ safe_divide('revenue', 'orders') }} as avg_order_value
```

---

## Partitioning and Clustering

### Partitioning Strategies

**Time-based Partitioning (Most Common):**

```sql
-- BigQuery
CREATE TABLE fct_events
PARTITION BY DATE(event_timestamp)
CLUSTER BY user_id, event_type
AS SELECT * FROM raw_events;

-- Snowflake (automatic micro-partitioning)
-- Explicit clustering for optimization
ALTER TABLE fct_events CLUSTER BY (event_date, user_id);

-- Spark/Delta Lake
df.write \
    .format("delta") \
    .partitionBy("event_date") \
    .save("/path/to/table")
```

**Partition Pruning:**

```sql
-- Query with partition filter (fast)
SELECT * FROM fct_events
WHERE event_date = '2024-01-15';  -- Scans only 1 partition

-- Query without partition filter (slow - full scan)
SELECT * FROM fct_events
WHERE user_id = '12345';  -- Scans all partitions
```

**Partition Size Guidelines:**

| Partition | Size Target | Notes |
|-----------|-------------|-------|
| Daily | 1-10 GB | Ideal for most cases |
| Hourly | 100 MB - 1 GB | High-volume streaming |
| Monthly | 10-100 GB | Infrequent access |

### Clustering

```sql
-- BigQuery clustering (up to 4 columns)
CREATE TABLE fct_sales
PARTITION BY DATE(sale_date)
CLUSTER BY customer_id, product_id
AS SELECT * FROM raw_sales;

-- Snowflake clustering
CREATE TABLE fct_sales (
    sale_id INT,
    customer_id VARCHAR(50),
    product_id VARCHAR(50),
    sale_date DATE,
    amount DECIMAL(10,2)
)
CLUSTER BY (customer_id, sale_date);

-- Delta Lake Z-ordering
OPTIMIZE events ZORDER BY (user_id, event_type);
```

**When to Cluster:**

| Column Type | Cluster? | Notes |
|-------------|----------|-------|
| High cardinality filter columns | Yes | customer_id, product_id |
| Join keys | Yes | Improves join performance |
| Low cardinality | Maybe | status, type (limited benefit) |
| Frequently updated | No | Clustering breaks on updates |

---

## Schema Evolution

### Adding Columns

```sql
-- Safe: Add nullable column
ALTER TABLE fct_orders ADD COLUMN discount_amount DECIMAL(10,2);

-- With default
ALTER TABLE fct_orders ADD COLUMN currency VARCHAR(3) DEFAULT 'USD';

-- dbt handling
{{
    config(
        materialized='incremental',
        on_schema_change='append_new_columns'
    )
}}
```

### Handling in Spark/Delta

```python
# Delta Lake schema evolution
df.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .save("/path/to/table")

# Explicit schema enforcement
spark.sql("""
    ALTER TABLE delta.`/path/to/table`
    ADD COLUMNS (new_column STRING)
""")

# Schema merge on read
df = spark.read \
    .option("mergeSchema", "true") \
    .format("delta") \
    .load("/path/to/table")
```

### Backward Compatibility

```sql
-- Create view for backward compatibility
CREATE VIEW orders_v1 AS
SELECT
    order_id,
    customer_id,
    amount,
    -- Map new columns to old schema
    COALESCE(discount_amount, 0) as discount,
    COALESCE(currency, 'USD') as currency
FROM orders_v2;

-- Deprecation pattern
CREATE VIEW orders_deprecated AS
SELECT * FROM orders_v1;
-- Add comment: "DEPRECATED: Use orders_v2. Will be removed 2024-06-01"
```

### Data Contracts for Schema Changes

```yaml
# contracts/orders_contract.yaml
name: orders
version: "2.0.0"
owner: [email protected]

schema:
  order_id:
    type: string
    required: true
    breaking_change: never

  customer_id:
    type: string
    required: true
    breaking_change: never

  amount:
    type: decimal
    precision: 10
    scale: 2
    required: true

  # New in v2.0.0
  discount_amount:
    type: decimal
    precision: 10
    scale: 2
    required: false
    added_in: "2.0.0"
    default: 0

  # Deprecated in v2.0.0
  legacy_status:
    type: string
    deprecated: true
    removed_in: "3.0.0"
    migration: "Use order_status instead"

compatibility:
  backward: true   # v2 readers can read v1 data
  forward: true    # v1 readers can read v2 data
```

```

### references/dataops_best_practices.md

```markdown
# DataOps Best Practices

Comprehensive guide to DataOps practices for production data systems.

## Table of Contents

1. [Data Testing Frameworks](#data-testing-frameworks)
2. [Data Contracts](#data-contracts)
3. [CI/CD for Data Pipelines](#cicd-for-data-pipelines)
4. [Observability and Lineage](#observability-and-lineage)
5. [Incident Response](#incident-response)
6. [Cost Optimization](#cost-optimization)

---

## Data Testing Frameworks

### Great Expectations

```python
# great_expectations_suite.py
import great_expectations as gx
from great_expectations.core.batch import BatchRequest

# Initialize context
context = gx.get_context()

# Create expectation suite
suite = context.add_expectation_suite("orders_suite")

# Get validator
validator = context.get_validator(
    batch_request=BatchRequest(
        datasource_name="warehouse",
        data_asset_name="orders",
    ),
    expectation_suite_name="orders_suite"
)

# Schema expectations
validator.expect_table_columns_to_match_set(
    column_set=["order_id", "customer_id", "amount", "created_at", "status"],
    exact_match=True
)

# Completeness expectations
validator.expect_column_values_to_not_be_null(
    column="order_id",
    mostly=1.0  # 100% must be non-null
)

validator.expect_column_values_to_not_be_null(
    column="customer_id",
    mostly=0.99  # 99% must be non-null
)

# Uniqueness expectations
validator.expect_column_values_to_be_unique("order_id")

# Type expectations
validator.expect_column_values_to_be_of_type("amount", "FLOAT")
validator.expect_column_values_to_be_of_type("created_at", "TIMESTAMP")

# Range expectations
validator.expect_column_values_to_be_between(
    column="amount",
    min_value=0,
    max_value=1000000,
    mostly=0.999
)

# Categorical expectations
validator.expect_column_values_to_be_in_set(
    column="status",
    value_set=["pending", "confirmed", "shipped", "delivered", "cancelled"]
)

# Distribution expectations
validator.expect_column_mean_to_be_between(
    column="amount",
    min_value=50,
    max_value=500
)

# Freshness expectations
validator.expect_column_max_to_be_between(
    column="created_at",
    min_value={"$PARAMETER": "now() - interval '24 hours'"},
    max_value={"$PARAMETER": "now()"}
)

# Cross-table expectations (referential integrity)
validator.expect_column_pair_values_to_be_in_set(
    column_A="customer_id",
    column_B="customer_status",
    value_pairs_set=[
        ("cust_001", "active"),
        ("cust_002", "active"),
        # ...
    ]
)

# Save suite
validator.save_expectation_suite(discard_failed_expectations=False)

# Run validation
checkpoint = context.add_or_update_checkpoint(
    name="orders_checkpoint",
    validations=[
        {
            "batch_request": {
                "datasource_name": "warehouse",
                "data_asset_name": "orders",
            },
            "expectation_suite_name": "orders_suite",
        }
    ],
)

results = checkpoint.run()
print(f"Validation success: {results.success}")
```

### dbt Tests

```yaml
# models/marts/schema.yml
version: 2

models:
  - name: fct_orders
    description: "Order fact table with comprehensive testing"

    # Model-level tests
    tests:
      # Row count consistency
      - dbt_utils.equal_rowcount:
          compare_model: ref('stg_orders')

      # Expression test
      - dbt_utils.expression_is_true:
          expression: "net_amount >= 0"

      # Recency test
      - dbt_utils.recency:
          datepart: hour
          field: _loaded_at
          interval: 24

    columns:
      - name: order_id
        description: "Primary key - unique order identifier"
        tests:
          - unique
          - not_null
          - dbt_expectations.expect_column_values_to_match_regex:
              regex: "^ORD-[0-9]{10}$"

      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
              severity: warn  # Don't fail, just warn

      - name: order_date
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: "'2020-01-01'"
              max_value: "current_date"

      - name: net_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000
              inclusive: true

      - name: quantity
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 1
              max_value: 1000
              row_condition: "status != 'cancelled'"

      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']

  - name: dim_customers
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null

      - name: email
        tests:
          - unique:
              where: "is_current = true"
          - dbt_expectations.expect_column_values_to_match_regex:
              regex: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"

# Custom generic test
# tests/generic/test_no_orphan_records.sql
{% test no_orphan_records(model, column_name, parent_model, parent_column) %}
SELECT {{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} NOT IN (
    SELECT {{ parent_column }}
    FROM {{ parent_model }}
)
{% endtest %}
```

### Custom Data Quality Checks

```python
# data_quality/quality_checks.py
from dataclasses import dataclass
from typing import List, Dict, Any, Callable
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

@dataclass
class QualityCheck:
    name: str
    description: str
    severity: str  # "critical", "warning", "info"
    check_func: Callable
    threshold: float = 1.0

@dataclass
class QualityResult:
    check_name: str
    passed: bool
    actual_value: float
    threshold: float
    message: str
    timestamp: datetime

class DataQualityValidator:
    """Comprehensive data quality validation framework."""

    def __init__(self, connection):
        self.conn = connection
        self.checks: List[QualityCheck] = []
        self.results: List[QualityResult] = []

    def add_check(self, check: QualityCheck):
        self.checks.append(check)

    # Built-in check generators
    def add_null_check(self, table: str, column: str, max_null_rate: float = 0.0):
        def check_nulls():
            query = f"""
                SELECT
                    COUNT(*) as total,
                    SUM(CASE WHEN {column} IS NULL THEN 1 ELSE 0 END) as nulls
                FROM {table}
            """
            result = self.conn.execute(query).fetchone()
            null_rate = result[1] / result[0] if result[0] > 0 else 0
            return null_rate <= max_null_rate, null_rate

        self.add_check(QualityCheck(
            name=f"null_check_{table}_{column}",
            description=f"Check null rate for {table}.{column}",
            severity="critical" if max_null_rate == 0 else "warning",
            check_func=check_nulls,
            threshold=max_null_rate
        ))

    def add_uniqueness_check(self, table: str, column: str):
        def check_unique():
            query = f"""
                SELECT
                    COUNT(*) as total,
                    COUNT(DISTINCT {column}) as distinct_count
                FROM {table}
            """
            result = self.conn.execute(query).fetchone()
            is_unique = result[0] == result[1]
            duplicate_rate = 1 - (result[1] / result[0]) if result[0] > 0 else 0
            return is_unique, duplicate_rate

        self.add_check(QualityCheck(
            name=f"uniqueness_check_{table}_{column}",
            description=f"Check uniqueness for {table}.{column}",
            severity="critical",
            check_func=check_unique,
            threshold=0.0
        ))

    def add_freshness_check(self, table: str, timestamp_column: str, max_hours: int):
        def check_freshness():
            query = f"""
                SELECT MAX({timestamp_column}) as latest
                FROM {table}
            """
            result = self.conn.execute(query).fetchone()
            if result[0] is None:
                return False, float('inf')

            hours_old = (datetime.now() - result[0]).total_seconds() / 3600
            return hours_old <= max_hours, hours_old

        self.add_check(QualityCheck(
            name=f"freshness_check_{table}",
            description=f"Check data freshness for {table}",
            severity="critical",
            check_func=check_freshness,
            threshold=max_hours
        ))

    def add_range_check(self, table: str, column: str, min_val: float, max_val: float):
        def check_range():
            query = f"""
                SELECT
                    COUNT(*) as total,
                    SUM(CASE WHEN {column} < {min_val} OR {column} > {max_val} THEN 1 ELSE 0 END) as out_of_range
                FROM {table}
            """
            result = self.conn.execute(query).fetchone()
            violation_rate = result[1] / result[0] if result[0] > 0 else 0
            return violation_rate == 0, violation_rate

        self.add_check(QualityCheck(
            name=f"range_check_{table}_{column}",
            description=f"Check range [{min_val}, {max_val}] for {table}.{column}",
            severity="warning",
            check_func=check_range,
            threshold=0.0
        ))

    def add_referential_integrity_check(self, child_table: str, child_column: str,
                                        parent_table: str, parent_column: str):
        def check_referential():
            query = f"""
                SELECT COUNT(*)
                FROM {child_table} c
                LEFT JOIN {parent_table} p ON c.{child_column} = p.{parent_column}
                WHERE p.{parent_column} IS NULL AND c.{child_column} IS NOT NULL
            """
            result = self.conn.execute(query).fetchone()
            orphan_count = result[0]
            return orphan_count == 0, orphan_count

        self.add_check(QualityCheck(
            name=f"referential_integrity_{child_table}_{child_column}",
            description=f"Check FK {child_table}.{child_column} -> {parent_table}.{parent_column}",
            severity="warning",
            check_func=check_referential,
            threshold=0
        ))

    def run_all_checks(self) -> Dict[str, Any]:
        """Execute all quality checks and return results."""
        self.results = []

        for check in self.checks:
            try:
                passed, actual_value = check.check_func()
                result = QualityResult(
                    check_name=check.name,
                    passed=passed,
                    actual_value=actual_value,
                    threshold=check.threshold,
                    message=f"{'PASSED' if passed else 'FAILED'}: {check.description}",
                    timestamp=datetime.now()
                )
            except Exception as e:
                result = QualityResult(
                    check_name=check.name,
                    passed=False,
                    actual_value=-1,
                    threshold=check.threshold,
                    message=f"ERROR: {str(e)}",
                    timestamp=datetime.now()
                )

            self.results.append(result)
            logger.info(result.message)

        # Summary
        total = len(self.results)
        passed = sum(1 for r in self.results if r.passed)
        failed = total - passed

        critical_failures = [
            r for r, c in zip(self.results, self.checks)
            if not r.passed and c.severity == "critical"
        ]

        return {
            "total_checks": total,
            "passed": passed,
            "failed": failed,
            "success_rate": passed / total if total > 0 else 0,
            "critical_failures": len(critical_failures),
            "results": self.results,
            "overall_passed": len(critical_failures) == 0
        }
```

---

## Data Contracts

### Contract Definition

```yaml
# contracts/orders_v2.yaml
contract:
  name: orders
  version: "2.0.0"
  owner: [email protected]
  team: Data Engineering
  slack_channel: "#data-platform-alerts"

description: |
  Order events from the e-commerce platform.
  Contains all customer orders with line items.

schema:
  type: object
  required:
    - order_id
    - customer_id
    - created_at
    - total_amount
  properties:
    order_id:
      type: string
      format: uuid
      description: "Unique order identifier"
      pii: false
      breaking_change: never

    customer_id:
      type: string
      description: "Customer identifier (foreign key)"
      pii: true
      retention_days: 365

    created_at:
      type: timestamp
      format: "ISO8601"
      timezone: "UTC"
      description: "Order creation timestamp"

    total_amount:
      type: decimal
      precision: 10
      scale: 2
      minimum: 0
      description: "Total order amount in USD"

    status:
      type: string
      enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]
      default: "pending"

    line_items:
      type: array
      items:
        type: object
        properties:
          product_id:
            type: string
          quantity:
            type: integer
            minimum: 1
          unit_price:
            type: decimal

# Quality SLAs
quality:
  freshness:
    max_delay_minutes: 60
    check_frequency: "*/15 * * * *"  # Every 15 minutes

  completeness:
    required_fields_null_rate: 0.0
    optional_fields_null_rate: 0.05

  uniqueness:
    order_id: true
    combination: ["order_id", "line_item_id"]

  validity:
    total_amount:
      min: 0
      max: 1000000
    status:
      allowed_values: ["pending", "confirmed", "shipped", "delivered", "cancelled"]

  volume:
    min_daily_records: 1000
    max_daily_records: 1000000
    anomaly_threshold: 0.5  # 50% deviation from average

# Semantic versioning rules
versioning:
  breaking_changes:
    - removing_required_field
    - changing_field_type
    - renaming_field
  non_breaking_changes:
    - adding_optional_field
    - adding_enum_value
    - changing_description

# Consumers
consumers:
  - name: analytics-dashboard
    team: Analytics
    contact: [email protected]
    usage: "Daily KPI dashboards"
    required_fields: ["order_id", "customer_id", "total_amount", "created_at"]

  - name: ml-churn-prediction
    team: ML Platform
    contact: [email protected]
    usage: "Customer churn prediction model"
    required_fields: ["customer_id", "created_at", "total_amount"]

  - name: finance-reporting
    team: Finance
    contact: [email protected]
    usage: "Revenue reconciliation"
    required_fields: ["order_id", "total_amount", "status"]

# Change management
change_process:
  notification_lead_time_days: 14
  approval_required_from:
    - data-platform-lead
    - affected-consumer-teams
  rollback_plan_required: true
```

### Contract Validation

```python
# contracts/validator.py
import yaml
import json
from dataclasses import dataclass
from typing import Dict, List, Any, Optional
from datetime import datetime
import jsonschema

@dataclass
class ContractValidationResult:
    contract_name: str
    version: str
    timestamp: datetime
    passed: bool
    schema_valid: bool
    quality_checks_passed: bool
    sla_checks_passed: bool
    violations: List[Dict[str, Any]]

class ContractValidator:
    """Validate data against contract definitions."""

    def __init__(self, contract_path: str):
        with open(contract_path) as f:
            self.contract = yaml.safe_load(f)

        self.contract_name = self.contract['contract']['name']
        self.version = self.contract['contract']['version']

    def validate_schema(self, data: List[Dict]) -> List[Dict]:
        """Validate data against JSON schema."""
        violations = []
        schema = self.contract['schema']

        for i, record in enumerate(data):
            try:
                jsonschema.validate(record, schema)
            except jsonschema.ValidationError as e:
                violations.append({
                    "type": "schema_violation",
                    "record_index": i,
                    "field": e.path[0] if e.path else None,
                    "message": e.message
                })

        return violations

    def validate_quality_slas(self, connection, table_name: str) -> List[Dict]:
        """Validate quality SLAs."""
        violations = []
        quality = self.contract.get('quality', {})

        # Freshness check
        if 'freshness' in quality:
            max_delay = quality['freshness']['max_delay_minutes']
            query = f"SELECT MAX(created_at) FROM {table_name}"
            result = connection.execute(query).fetchone()
            if result[0]:
                age_minutes = (datetime.now() - result[0]).total_seconds() / 60
                if age_minutes > max_delay:
                    violations.append({
                        "type": "freshness_violation",
                        "sla": f"max_delay_minutes: {max_delay}",
                        "actual": f"{age_minutes:.0f} minutes old",
                        "severity": "critical"
                    })

        # Completeness check
        if 'completeness' in quality:
            for field in self.contract['schema'].get('required', []):
                query = f"""
                    SELECT
                        COUNT(*) as total,
                        SUM(CASE WHEN {field} IS NULL THEN 1 ELSE 0 END) as nulls
                    FROM {table_name}
                """
                result = connection.execute(query).fetchone()
                null_rate = result[1] / result[0] if result[0] > 0 else 0
                max_rate = quality['completeness']['required_fields_null_rate']
                if null_rate > max_rate:
                    violations.append({
                        "type": "completeness_violation",
                        "field": field,
                        "sla": f"null_rate <= {max_rate}",
                        "actual": f"null_rate = {null_rate:.4f}",
                        "severity": "critical"
                    })

        # Uniqueness check
        if 'uniqueness' in quality:
            for field, should_be_unique in quality['uniqueness'].items():
                if field == 'combination':
                    continue
                if should_be_unique:
                    query = f"""
                        SELECT COUNT(*) - COUNT(DISTINCT {field})
                        FROM {table_name}
                    """
                    result = connection.execute(query).fetchone()
                    if result[0] > 0:
                        violations.append({
                            "type": "uniqueness_violation",
                            "field": field,
                            "duplicates": result[0],
                            "severity": "critical"
                        })

        # Volume check
        if 'volume' in quality:
            query = f"SELECT COUNT(*) FROM {table_name} WHERE DATE(created_at) = CURRENT_DATE"
            result = connection.execute(query).fetchone()
            daily_count = result[0]

            if daily_count < quality['volume']['min_daily_records']:
                violations.append({
                    "type": "volume_violation",
                    "sla": f"min_daily_records: {quality['volume']['min_daily_records']}",
                    "actual": daily_count,
                    "severity": "warning"
                })

        return violations

    def validate(self, connection, table_name: str, sample_data: List[Dict] = None) -> ContractValidationResult:
        """Run full contract validation."""
        violations = []

        # Schema validation (on sample data)
        schema_violations = []
        if sample_data:
            schema_violations = self.validate_schema(sample_data)
            violations.extend(schema_violations)

        # Quality SLA validation
        quality_violations = self.validate_quality_slas(connection, table_name)
        violations.extend(quality_violations)

        return ContractValidationResult(
            contract_name=self.contract_name,
            version=self.version,
            timestamp=datetime.now(),
            passed=len([v for v in violations if v.get('severity') == 'critical']) == 0,
            schema_valid=len(schema_violations) == 0,
            quality_checks_passed=len([v for v in quality_violations if v.get('severity') == 'critical']) == 0,
            sla_checks_passed=True,  # Add SLA timing checks
            violations=violations
        )
```

---

## CI/CD for Data Pipelines

### GitHub Actions Workflow

```yaml
# .github/workflows/data-pipeline-ci.yml
name: Data Pipeline CI/CD

on:
  push:
    branches: [main, develop]
    paths:
      - 'dbt/**'
      - 'airflow/**'
      - 'tests/**'
  pull_request:
    branches: [main]

env:
  DBT_PROFILES_DIR: ./dbt
  SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
  SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
  SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}

jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install sqlfluff dbt-core dbt-snowflake

      - name: Lint SQL
        run: |
          sqlfluff lint dbt/models --dialect snowflake

      - name: Lint dbt project
        run: |
          cd dbt && dbt deps && dbt compile

  test:
    runs-on: ubuntu-latest
    needs: lint
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install dbt-core dbt-snowflake pytest great-expectations

      - name: Run dbt tests on CI schema
        run: |
          cd dbt
          dbt deps
          dbt seed --target ci
          dbt run --target ci --select state:modified+
          dbt test --target ci --select state:modified+

      - name: Run data contract tests
        run: |
          pytest tests/contracts/ -v

      - name: Run Great Expectations validation
        run: |
          great_expectations checkpoint run ci_checkpoint

  deploy-staging:
    runs-on: ubuntu-latest
    needs: test
    if: github.ref == 'refs/heads/develop'
    environment: staging
    steps:
      - uses: actions/checkout@v4

      - name: Deploy to staging
        run: |
          cd dbt
          dbt deps
          dbt run --target staging
          dbt test --target staging

      - name: Run data quality checks
        run: |
          python scripts/run_quality_checks.py --env staging

  deploy-production:
    runs-on: ubuntu-latest
    needs: test
    if: github.ref == 'refs/heads/main'
    environment: production
    steps:
      - uses: actions/checkout@v4

      - name: Deploy to production
        run: |
          cd dbt
          dbt deps
          dbt run --target prod --full-refresh-models tag:full_refresh
          dbt run --target prod
          dbt test --target prod

      - name: Notify on success
        if: success()
        run: |
          curl -X POST ${{ secrets.SLACK_WEBHOOK }} \
            -H 'Content-type: application/json' \
            -d '{"text":"dbt production deployment successful!"}'

      - name: Notify on failure
        if: failure()
        run: |
          curl -X POST ${{ secrets.SLACK_WEBHOOK }} \
            -H 'Content-type: application/json' \
            -d '{"text":"dbt production deployment FAILED!"}'
```

### dbt CI Configuration

```yaml
# dbt_project.yml
name: 'analytics'
version: '1.0.0'

config-version: 2
profile: 'analytics'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets: ["target", "dbt_packages"]

# Slim CI configuration
on-run-start:
  - "{{ dbt_utils.log_info('Starting dbt run') }}"

on-run-end:
  - "{{ dbt_utils.log_info('dbt run complete') }}"

vars:
  # CI testing with limited data
  ci_limit: "{{ 1000 if target.name == 'ci' else none }}"

# Model configurations
models:
  analytics:
    staging:
      +materialized: view
      +schema: staging

    intermediate:
      +materialized: ephemeral

    marts:
      +materialized: table
      +schema: marts

      core:
        +tags: ['core', 'daily']

      marketing:
        +tags: ['marketing', 'daily']
```

### Slim CI with State Comparison

```bash
# scripts/slim_ci.sh
#!/bin/bash
set -e

# Download production manifest for state comparison
aws s3 cp s3://dbt-artifacts/prod/manifest.json ./target/prod_manifest.json

# Run only modified models and their downstream dependencies
dbt run \
  --target ci \
  --select state:modified+ \
  --state ./target/prod_manifest.json

# Test only affected models
dbt test \
  --target ci \
  --select state:modified+ \
  --state ./target/prod_manifest.json

# Upload CI artifacts
dbt docs generate
aws s3 sync ./target s3://dbt-artifacts/ci/${GITHUB_SHA}/
```

---

## Observability and Lineage

### Data Lineage with OpenLineage

```python
# lineage/openlineage_emitter.py
from openlineage.client import OpenLineageClient
from openlineage.client.run import Run, RunEvent, RunState, Job, Dataset
from openlineage.client.facet import (
    SchemaDatasetFacet,
    SchemaField,
    SqlJobFacet,
    DataQualityMetricsInputDatasetFacet
)
from datetime import datetime
import uuid

class DataLineageEmitter:
    """Emit data lineage events to OpenLineage."""

    def __init__(self, api_url: str, namespace: str = "data-platform"):
        self.client = OpenLineageClient(url=api_url)
        self.namespace = namespace

    def emit_job_start(self, job_name: str, inputs: list, outputs: list,
                       sql: str = None) -> str:
        """Emit job start event."""
        run_id = str(uuid.uuid4())

        # Build input datasets
        input_datasets = [
            Dataset(
                namespace=self.namespace,
                name=inp['name'],
                facets={
                    "schema": SchemaDatasetFacet(
                        fields=[
                            SchemaField(name=f['name'], type=f['type'])
                            for f in inp.get('schema', [])
                        ]
                    )
                }
            )
            for inp in inputs
        ]

        # Build output datasets
        output_datasets = [
            Dataset(
                namespace=self.namespace,
                name=out['name'],
                facets={
                    "schema": SchemaDatasetFacet(
                        fields=[
                            SchemaField(name=f['name'], type=f['type'])
                            for f in out.get('schema', [])
                        ]
                    )
                }
            )
            for out in outputs
        ]

        # Build job facets
        job_facets = {}
        if sql:
            job_facets["sql"] = SqlJobFacet(query=sql)

        # Create and emit event
        event = RunEvent(
            eventType=RunState.START,
            eventTime=datetime.utcnow().isoformat() + "Z",
            run=Run(runId=run_id),
            job=Job(namespace=self.namespace, name=job_name, facets=job_facets),
            inputs=input_datasets,
            outputs=output_datasets
        )

        self.client.emit(event)
        return run_id

    def emit_job_complete(self, job_name: str, run_id: str,
                          output_metrics: dict = None):
        """Emit job completion event."""
        output_facets = {}
        if output_metrics:
            output_facets["dataQuality"] = DataQualityMetricsInputDatasetFacet(
                rowCount=output_metrics.get('row_count'),
                bytes=output_metrics.get('bytes')
            )

        event = RunEvent(
            eventType=RunState.COMPLETE,
            eventTime=datetime.utcnow().isoformat() + "Z",
            run=Run(runId=run_id),
            job=Job(namespace=self.namespace, name=job_name),
            inputs=[],
            outputs=[]
        )

        self.client.emit(event)

    def emit_job_fail(self, job_name: str, run_id: str, error_message: str):
        """Emit job failure event."""
        event = RunEvent(
            eventType=RunState.FAIL,
            eventTime=datetime.utcnow().isoformat() + "Z",
            run=Run(runId=run_id, facets={
                "errorMessage": {"message": error_message}
            }),
            job=Job(namespace=self.namespace, name=job_name),
            inputs=[],
            outputs=[]
        )

        self.client.emit(event)


# Usage example
emitter = DataLineageEmitter("http://marquez:5000/api/v1/lineage")

run_id = emitter.emit_job_start(
    job_name="transform_orders",
    inputs=[
        {"name": "raw.orders", "schema": [
            {"name": "id", "type": "string"},
            {"name": "amount", "type": "decimal"}
        ]}
    ],
    outputs=[
        {"name": "analytics.fct_orders", "schema": [
            {"name": "order_id", "type": "string"},
            {"name": "net_amount", "type": "decimal"}
        ]}
    ],
    sql="SELECT id as order_id, amount as net_amount FROM raw.orders"
)

# After job completes
emitter.emit_job_complete(
    job_name="transform_orders",
    run_id=run_id,
    output_metrics={"row_count": 1500000, "bytes": 125000000}
)
```

### Pipeline Monitoring with Prometheus

```python
# monitoring/metrics.py
from prometheus_client import Counter, Gauge, Histogram, start_http_server
from functools import wraps
import time

# Define metrics
PIPELINE_RUNS = Counter(
    'pipeline_runs_total',
    'Total number of pipeline runs',
    ['pipeline_name', 'status']
)

PIPELINE_DURATION = Histogram(
    'pipeline_duration_seconds',
    'Pipeline execution duration',
    ['pipeline_name'],
    buckets=[60, 300, 600, 1800, 3600, 7200]
)

ROWS_PROCESSED = Counter(
    'rows_processed_total',
    'Total rows processed by pipeline',
    ['pipeline_name', 'table_name']
)

DATA_FRESHNESS = Gauge(
    'data_freshness_hours',
    'Hours since last data update',
    ['table_name']
)

DATA_QUALITY_SCORE = Gauge(
    'data_quality_score',
    'Data quality score (0-1)',
    ['table_name', 'check_type']
)

def track_pipeline(pipeline_name: str):
    """Decorator to track pipeline execution."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                PIPELINE_RUNS.labels(pipeline_name=pipeline_name, status='success').inc()
                return result
            except Exception as e:
                PIPELINE_RUNS.labels(pipeline_name=pipeline_name, status='failure').inc()
                raise
            finally:
                duration = time.time() - start_time
                PIPELINE_DURATION.labels(pipeline_name=pipeline_name).observe(duration)
        return wrapper
    return decorator

def record_rows_processed(pipeline_name: str, table_name: str, row_count: int):
    """Record number of rows processed."""
    ROWS_PROCESSED.labels(pipeline_name=pipeline_name, table_name=table_name).inc(row_count)

def update_freshness(table_name: str, hours_since_update: float):
    """Update data freshness metric."""
    DATA_FRESHNESS.labels(table_name=table_name).set(hours_since_update)

def update_quality_score(table_name: str, check_type: str, score: float):
    """Update data quality score."""
    DATA_QUALITY_SCORE.labels(table_name=table_name, check_type=check_type).set(score)

# Start metrics server
if __name__ == '__main__':
    start_http_server(8000)
```

### Alerting Configuration

```yaml
# alerting/prometheus_rules.yml
groups:
  - name: data_quality_alerts
    rules:
      - alert: DataFreshnessAlert
        expr: data_freshness_hours > 24
        for: 15m
        labels:
          severity: critical
          team: data-platform
        annotations:
          summary: "Data freshness SLA violated"
          description: "Table {{ $labels.table_name }} has not been updated for {{ $value }} hours"

      - alert: DataQualityDegraded
        expr: data_quality_score < 0.95
        for: 10m
        labels:
          severity: warning
          team: data-platform
        annotations:
          summary: "Data quality below threshold"
          description: "Table {{ $labels.table_name }} quality score is {{ $value }}"

      - alert: PipelineFailure
        expr: increase(pipeline_runs_total{status="failure"}[1h]) > 0
        for: 5m
        labels:
          severity: critical
          team: data-platform
        annotations:
          summary: "Pipeline failure detected"
          description: "Pipeline {{ $labels.pipeline_name }} has failed"

      - alert: PipelineSlowdown
        expr: histogram_quantile(0.95, rate(pipeline_duration_seconds_bucket[1h])) > 3600
        for: 30m
        labels:
          severity: warning
          team: data-platform
        annotations:
          summary: "Pipeline execution time degraded"
          description: "Pipeline {{ $labels.pipeline_name }} p95 duration is {{ $value }} seconds"

      - alert: LowRowCount
        expr: increase(rows_processed_total[24h]) < 1000
        for: 1h
        labels:
          severity: warning
          team: data-platform
        annotations:
          summary: "Unusually low row count"
          description: "Pipeline {{ $labels.pipeline_name }} processed only {{ $value }} rows in 24h"
```

---

## Incident Response

### Runbook Template

```markdown
# Incident Runbook: Data Pipeline Failure

## Overview
This runbook covers procedures for handling data pipeline failures.

## Severity Levels
- **P1 (Critical)**: Data older than 24 hours, revenue-impacting
- **P2 (High)**: Data older than 4 hours, customer-facing dashboards affected
- **P3 (Medium)**: Data older than 1 hour, internal reports delayed
- **P4 (Low)**: Non-critical pipeline, no business impact

## Initial Response (First 15 minutes)

### 1. Acknowledge the Alert
```bash
# Acknowledge in PagerDuty
curl -X POST https://api.pagerduty.com/incidents/{incident_id}/acknowledge

# Post in #data-incidents Slack channel
```

### 2. Assess Impact
- Which tables are affected?
- Which downstream consumers are impacted?
- What is the data freshness currently?

```sql
-- Check data freshness
SELECT
    table_name,
    MAX(updated_at) as last_update,
    DATEDIFF(hour, MAX(updated_at), CURRENT_TIMESTAMP) as hours_stale
FROM information_schema.tables
WHERE table_schema = 'analytics'
GROUP BY table_name
ORDER BY hours_stale DESC;
```

### 3. Identify Root Cause

#### Check Pipeline Status
```bash
# Airflow
airflow dags list-runs -d <dag_id> --state failed

# dbt
dbt debug
dbt run --select state:failed

# Spark
spark-submit --status <application_id>
```

#### Common Failure Modes

| Symptom | Likely Cause | Fix |
|---------|--------------|-----|
| OOM errors | Data volume spike | Increase memory, add partitioning |
| Timeout | Slow query | Optimize query, check locks |
| Connection refused | Network/auth | Check credentials, VPC rules |
| Schema mismatch | Source change | Update schema, add contract |
| Duplicate key | Upstream bug | Deduplicate, fix source |

## Resolution Procedures

### Restart Failed Pipeline
```bash
# Clear failed Airflow task
airflow tasks clear <dag_id> -t <task_id> -s <start_date> -e <end_date>

# Rerun dbt model
dbt run --select <model_name>+

# Resubmit Spark job
spark-submit --deploy-mode cluster <job.py>
```

### Backfill Missing Data
```bash
# Airflow backfill
airflow dags backfill -s 2024-01-01 -e 2024-01-02 <dag_id>

# dbt incremental refresh
dbt run --full-refresh --select <model_name>
```

### Rollback Procedure
```bash
# dbt rollback (use previous version)
git checkout <previous_sha> -- models/<model>.sql
dbt run --select <model_name>

# Delta Lake time travel
spark.sql("""
    RESTORE TABLE analytics.orders TO VERSION AS OF 10
""")
```

## Post-Incident

### 1. Write Incident Report
- Timeline of events
- Root cause analysis
- Impact assessment
- Remediation steps taken
- Follow-up action items

### 2. Update Monitoring
- Add missing alerts
- Adjust thresholds
- Improve documentation

### 3. Share Learnings
- Post in #data-engineering
- Update runbooks
- Schedule blameless postmortem if P1/P2
```

---

## Cost Optimization

### Query Cost Analysis

```sql
-- Snowflake query cost analysis
SELECT
    query_id,
    user_name,
    warehouse_name,
    execution_time / 1000 as execution_seconds,
    bytes_scanned / 1e9 as gb_scanned,
    credits_used_cloud_services,
    query_text
FROM snowflake.account_usage.query_history
WHERE start_time > DATEADD(day, -7, CURRENT_TIMESTAMP)
ORDER BY credits_used_cloud_services DESC
LIMIT 20;

-- BigQuery cost analysis
SELECT
    user_email,
    query,
    total_bytes_processed / 1e12 as tb_processed,
    total_bytes_processed / 1e12 * 5 as estimated_cost_usd,  -- $5/TB
    creation_time
FROM `project.region-us.INFORMATION_SCHEMA.JOBS_BY_USER`
WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
ORDER BY total_bytes_processed DESC
LIMIT 20;
```

### Cost Optimization Strategies

```python
# cost/optimizer.py
from dataclasses import dataclass
from typing import List, Dict
import pandas as pd

@dataclass
class CostRecommendation:
    category: str
    current_cost: float
    potential_savings: float
    recommendation: str
    priority: str

class CostOptimizer:
    """Analyze and optimize data platform costs."""

    def __init__(self, connection):
        self.conn = connection

    def analyze_query_costs(self) -> List[CostRecommendation]:
        """Identify expensive queries and optimization opportunities."""
        recommendations = []

        # Find queries scanning full tables
        full_scans = self.conn.execute("""
            SELECT
                query_text,
                COUNT(*) as execution_count,
                AVG(bytes_scanned) as avg_bytes,
                SUM(credits_used) as total_credits
            FROM query_history
            WHERE bytes_scanned > 1e10  -- > 10GB
              AND start_time > DATEADD(day, -7, CURRENT_TIMESTAMP)
            GROUP BY query_text
            HAVING COUNT(*) > 10
            ORDER BY total_credits DESC
        """).fetchall()

        for query, count, avg_bytes, credits in full_scans:
            recommendations.append(CostRecommendation(
                category="Query Optimization",
                current_cost=credits,
                potential_savings=credits * 0.7,  # Estimate 70% savings
                recommendation=f"Add WHERE clause or partitioning to reduce scan. Query runs {count}x/week, scans {avg_bytes/1e9:.1f}GB each time.",
                priority="high"
            ))

        return recommendations

    def analyze_storage_costs(self) -> List[CostRecommendation]:
        """Identify storage optimization opportunities."""
        recommendations = []

        # Find large unused tables
        unused_tables = self.conn.execute("""
            SELECT
                table_name,
                bytes / 1e9 as size_gb,
                last_accessed
            FROM table_metadata
            WHERE last_accessed < DATEADD(day, -90, CURRENT_TIMESTAMP)
              AND bytes > 1e9  -- > 1GB
            ORDER BY bytes DESC
        """).fetchall()

        for table, size, last_accessed in unused_tables:
            monthly_cost = size * 0.023  # $0.023/GB/month for S3
            recommendations.append(CostRecommendation(
                category="Storage",
                current_cost=monthly_cost,
                potential_savings=monthly_cost,
                recommendation=f"Table {table} ({size:.1f}GB) not accessed since {last_accessed}. Consider archiving or deleting.",
                priority="medium"
            ))

        # Find tables without partitioning
        unpartitioned = self.conn.execute("""
            SELECT table_name, bytes / 1e9 as size_gb
            FROM table_metadata
            WHERE partition_column IS NULL
              AND bytes > 10e9  -- > 10GB
        """).fetchall()

        for table, size in unpartitioned:
            recommendations.append(CostRecommendation(
                category="Storage",
                current_cost=0,
                potential_savings=size * 0.1,  # Estimate 10% query cost savings
                recommendation=f"Table {table} ({size:.1f}GB) is not partitioned. Add partitioning to reduce query costs.",
                priority="high"
            ))

        return recommendations

    def analyze_compute_costs(self) -> List[CostRecommendation]:
        """Identify compute optimization opportunities."""
        recommendations = []

        # Find oversized warehouses
        warehouse_util = self.conn.execute("""
            SELECT
                warehouse_name,
                warehouse_size,
                AVG(avg_running_queries) as avg_queries,
                AVG(credits_used) as avg_credits
            FROM warehouse_metering_history
            WHERE start_time > DATEADD(day, -7, CURRENT_TIMESTAMP)
            GROUP BY warehouse_name, warehouse_size
        """).fetchall()

        for wh, size, avg_queries, avg_credits in warehouse_util:
            if avg_queries < 1 and size not in ['X-Small', 'Small']:
                recommendations.append(CostRecommendation(
                    category="Compute",
                    current_cost=avg_credits * 7,  # Weekly
                    potential_savings=avg_credits * 7 * 0.5,
                    recommendation=f"Warehouse {wh} ({size}) has low utilization ({avg_queries:.1f} avg queries). Consider downsizing.",
                    priority="high"
                ))

        return recommendations

    def generate_report(self) -> Dict:
        """Generate comprehensive cost optimization report."""
        all_recommendations = (
            self.analyze_query_costs() +
            self.analyze_storage_costs() +
            self.analyze_compute_costs()
        )

        total_current = sum(r.current_cost for r in all_recommendations)
        total_savings = sum(r.potential_savings for r in all_recommendations)

        return {
            "total_current_monthly_cost": total_current,
            "total_potential_savings": total_savings,
            "savings_percentage": total_savings / total_current * 100 if total_current > 0 else 0,
            "recommendations": [
                {
                    "category": r.category,
                    "current_cost": r.current_cost,
                    "potential_savings": r.potential_savings,
                    "recommendation": r.recommendation,
                    "priority": r.priority
                }
                for r in sorted(all_recommendations, key=lambda x: -x.potential_savings)
            ]
        }
```

```



---

## Skill Companion Files

> Additional files collected from the skill directory layout.

### _meta.json

```json
{
  "owner": "alirezarezvani",
  "slug": "senior-data-engineer",
  "displayName": "Senior Data Engineer",
  "latest": {
    "version": "2.1.1",
    "publishedAt": 1773070300242,
    "commit": "https://github.com/openclaw/skills/commit/417254e42bd1187be42bafe0a87b0df096a2aa80"
  },
  "history": [
    {
      "version": "1.0.0",
      "publishedAt": 1770402432915,
      "commit": "https://github.com/openclaw/skills/commit/9676cc343528818d1dd07b8398ddb16e8fc72b60"
    }
  ]
}

```

### references/troubleshooting.md

```markdown
# senior-data-engineer reference

## Troubleshooting

### Pipeline Failures

**Symptom:** Airflow DAG fails with timeout
```
Task exceeded max execution time
```

**Solution:**
1. Check resource allocation
2. Profile slow operations
3. Add incremental processing
```python
# Increase timeout
default_args = {
    'execution_timeout': timedelta(hours=2),
}

# Or use incremental loads
WHERE updated_at > '{{ prev_ds }}'
```

---

**Symptom:** Spark job OOM
```
java.lang.OutOfMemoryError: Java heap space
```

**Solution:**
1. Increase executor memory
2. Reduce partition size
3. Use disk spill
```python
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.memory.fraction", "0.8")
```

---

**Symptom:** Kafka consumer lag increasing
```
Consumer lag: 1000000 messages
```

**Solution:**
1. Increase consumer parallelism
2. Optimize processing logic
3. Scale consumer group
```bash
# Add more partitions
kafka-topics.sh --alter \
  --bootstrap-server localhost:9092 \
  --topic user-events \
  --partitions 24
```

---

### Data Quality Issues

**Symptom:** Duplicate records appearing
```
Expected unique, found 150 duplicates
```

**Solution:**
1. Add deduplication logic
2. Use merge/upsert operations
```sql
-- dbt incremental with dedup
{{
    config(
        materialized='incremental',
        unique_key='order_id'
    )
}}

SELECT * FROM (
    SELECT
        *,
        ROW_NUMBER() OVER (
            PARTITION BY order_id
            ORDER BY updated_at DESC
        ) as rn
    FROM {{ source('raw', 'orders') }}
) WHERE rn = 1
```

---

**Symptom:** Stale data in tables
```
Last update: 3 days ago
```

**Solution:**
1. Check upstream pipeline status
2. Verify source availability
3. Add freshness monitoring
```yaml
# dbt freshness check
sources:
  - name: "raw"
    freshness:
      warn_after: {count: 12, period: hour}
      error_after: {count: 24, period: hour}
    loaded_at_field: _loaded_at
```

---

**Symptom:** Schema drift detected
```
Column 'new_field' not in expected schema
```

**Solution:**
1. Update data contract
2. Modify transformations
3. Communicate with producers
```python
# Handle schema evolution
df = spark.read.format("delta") \
    .option("mergeSchema", "true") \
    .load("/data/orders")
```

---

### Performance Issues

**Symptom:** Query takes hours
```
Query runtime: 4 hours (expected: 30 minutes)
```

**Solution:**
1. Check query plan
2. Add proper partitioning
3. Optimize joins
```sql
-- Before: Full table scan
SELECT * FROM orders WHERE order_date = '2024-01-15';

-- After: Partition pruning
-- Table partitioned by order_date
SELECT * FROM orders WHERE order_date = '2024-01-15';

-- Add clustering for frequent filters
ALTER TABLE orders CLUSTER BY (customer_id);
```

---

**Symptom:** dbt model takes too long
```
Model fct_orders completed in 45 minutes
```

**Solution:**
1. Use incremental materialization
2. Reduce upstream dependencies
3. Pre-aggregate where possible
```sql
-- Convert to incremental
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        on_schema_change='sync_all_columns'
    )
}}

SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}
```

```

### references/workflows.md

```markdown
# senior-data-engineer reference

## Workflows

### Workflow 1: Building a Batch ETL Pipeline

**Scenario:** Extract data from PostgreSQL, transform with dbt, load to Snowflake.

#### Step 1: Define Source Schema

```sql
-- Document source tables
SELECT
    table_name,
    column_name,
    data_type,
    is_nullable
FROM information_schema.columns
WHERE table_schema = 'source_schema'
ORDER BY table_name, ordinal_position;
```

#### Step 2: Generate Extraction Config

```bash
python scripts/pipeline_orchestrator.py generate \
  --type airflow \
  --source postgres \
  --tables orders,customers,products \
  --mode incremental \
  --watermark updated_at \
  --output dags/extract_source.py
```

#### Step 3: Create dbt Models

```sql
-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('postgres', 'orders') }}
),

renamed AS (
    SELECT
        order_id,
        customer_id,
        order_date,
        total_amount,
        status,
        _extracted_at
    FROM source
    WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)
)

SELECT * FROM renamed
```

```sql
-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        cluster_by=['order_date']
    )
}}

SELECT
    o.order_id,
    o.customer_id,
    c.customer_segment,
    o.order_date,
    o.total_amount,
    o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
    ON o.customer_id = c.customer_id

{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}
```

#### Step 4: Configure Data Quality Tests

```yaml
# models/marts/schema.yml
version: 2

models:
  - name: "fct-orders"
    description: "Order fact table"
    columns:
      - name: "order-id"
        tests:
          - unique
          - not_null
      - name: "total-amount"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000
      - name: "order-date"
        tests:
          - not_null
          - dbt_utils.recency:
              datepart: day
              field: order_date
              interval: 1
```

#### Step 5: Create Airflow DAG

```python
# dags/daily_etl.py
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['[email protected]'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'daily_etl_pipeline',
    default_args=default_args,
    description='Daily ETL from PostgreSQL to Snowflake',
    schedule_interval='0 5 * * *',
    start_date=days_ago(1),
    catchup=False,
    tags=['etl', 'daily'],
) as dag:

    extract = BashOperator(
        task_id='extract_source_data',
        bash_command='python /opt/airflow/scripts/extract.py --date {{ ds }}',
    )

    transform = BashOperator(
        task_id='run_dbt_models',
        bash_command='cd /opt/airflow/dbt && dbt run --select marts.*',
    )

    test = BashOperator(
        task_id='run_dbt_tests',
        bash_command='cd /opt/airflow/dbt && dbt test --select marts.*',
    )

    notify = BashOperator(
        task_id='send_notification',
        bash_command='python /opt/airflow/scripts/notify.py --status success',
        trigger_rule='all_success',
    )

    extract >> transform >> test >> notify
```

#### Step 6: Validate Pipeline

```bash
# Test locally
dbt run --select stg_orders fct_orders
dbt test --select fct_orders

# Validate data quality
python scripts/data_quality_validator.py validate \
  --table fct_orders \
  --checks all \
  --output reports/quality_report.json
```

---

### Workflow 2: Implementing Real-Time Streaming

**Scenario:** Stream events from Kafka, process with Flink/Spark Streaming, sink to data lake.

#### Step 1: Define Event Schema

```json
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "UserEvent",
  "type": "object",
  "required": ["event_id", "user_id", "event_type", "timestamp"],
  "properties": {
    "event_id": {"type": "string", "format": "uuid"},
    "user_id": {"type": "string"},
    "event_type": {"type": "string", "enum": ["page_view", "click", "purchase"]},
    "timestamp": {"type": "string", "format": "date-time"},
    "properties": {"type": "object"}
  }
}
```

#### Step 2: Create Kafka Topic

```bash
# Create topic with appropriate partitions
kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic user-events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete

# Verify topic
kafka-topics.sh --describe \
  --bootstrap-server localhost:9092 \
  --topic user-events
```

#### Step 3: Implement Spark Streaming Job

```python
# streaming/user_events_processor.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, window, count, avg,
    to_timestamp, current_timestamp
)
from pyspark.sql.types import (
    StructType, StructField, StringType,
    TimestampType, MapType
)

# Initialize Spark
spark = SparkSession.builder \
    .appName("UserEventsProcessor") \
    .config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events") \
    .config("spark.sql.shuffle.partitions", "12") \
    .getOrCreate()

# Define schema
event_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", StringType(), False),
    StructField("event_type", StringType(), False),
    StructField("timestamp", StringType(), False),
    StructField("properties", MapType(StringType(), StringType()), True)
])

# Read from Kafka
events_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

# Parse JSON
parsed_df = events_df \
    .select(from_json(col("value").cast("string"), event_schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_timestamp", to_timestamp(col("timestamp")))

# Windowed aggregation
aggregated_df = parsed_df \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(
        window(col("event_timestamp"), "5 minutes"),
        col("event_type")
    ) \
    .agg(
        count("*").alias("event_count"),
        approx_count_distinct("user_id").alias("unique_users")
    )

# Write to Delta Lake
query = aggregated_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/user-events-aggregated") \
    .option("path", "/data/lake/user_events_aggregated") \
    .trigger(processingTime="1 minute") \
    .start()

query.awaitTermination()
```

#### Step 4: Handle Late Data and Errors

```python
# Dead letter queue for failed records
from pyspark.sql.functions import current_timestamp, lit

def process_with_error_handling(batch_df, batch_id):
    try:
        # Attempt processing
        valid_df = batch_df.filter(col("event_id").isNotNull())
        invalid_df = batch_df.filter(col("event_id").isNull())

        # Write valid records
        valid_df.write \
            .format("delta") \
            .mode("append") \
            .save("/data/lake/user_events")

        # Write invalid to DLQ
        if invalid_df.count() > 0:
            invalid_df \
                .withColumn("error_timestamp", current_timestamp()) \
                .withColumn("error_reason", lit("missing_event_id")) \
                .write \
                .format("delta") \
                .mode("append") \
                .save("/data/lake/dlq/user_events")

    except Exception as e:
        # Log error, alert, continue
        logger.error(f"Batch {batch_id} failed: {e}")
        raise

# Use foreachBatch for custom processing
query = parsed_df.writeStream \
    .foreachBatch(process_with_error_handling) \
    .option("checkpointLocation", "/checkpoints/user-events") \
    .start()
```

#### Step 5: Monitor Stream Health

```python
# monitoring/stream_metrics.py
from prometheus_client import Gauge, Counter, start_http_server

# Define metrics
RECORDS_PROCESSED = Counter(
    'stream_records_processed_total',
    'Total records processed',
    ['stream_name', 'status']
)

PROCESSING_LAG = Gauge(
    'stream_processing_lag_seconds',
    'Current processing lag',
    ['stream_name']
)

BATCH_DURATION = Gauge(
    'stream_batch_duration_seconds',
    'Last batch processing duration',
    ['stream_name']
)

def emit_metrics(query):
    """Emit Prometheus metrics from streaming query."""
    progress = query.lastProgress
    if progress:
        RECORDS_PROCESSED.labels(
            stream_name='user-events',
            status='success'
        ).inc(progress['numInputRows'])

        if progress['sources']:
            # Calculate lag from latest offset
            for source in progress['sources']:
                end_offset = source.get('endOffset', {})
                # Parse Kafka offsets and calculate lag
```

---

### Workflow 3: Data Quality Framework Setup

**Scenario:** Implement comprehensive data quality monitoring with Great Expectations.

#### Step 1: Initialize Great Expectations

```bash
# Install and initialize
pip install great_expectations

great_expectations init

# Connect to data source
great_expectations datasource new
```

#### Step 2: Create Expectation Suite

```python
# expectations/orders_suite.py
import great_expectations as gx

context = gx.get_context()

# Create expectation suite
suite = context.add_expectation_suite("orders_quality_suite")

# Add expectations
validator = context.get_validator(
    batch_request={
        "datasource_name": "warehouse",
        "data_asset_name": "orders",
    },
    expectation_suite_name="orders_quality_suite"
)

# Schema expectations
validator.expect_table_columns_to_match_ordered_list(
    column_list=[
        "order_id", "customer_id", "order_date",
        "total_amount", "status", "created_at"
    ]
)

# Completeness expectations
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("order_date")

# Uniqueness expectations
validator.expect_column_values_to_be_unique("order_id")

# Range expectations
validator.expect_column_values_to_be_between(
    "total_amount",
    min_value=0,
    max_value=1000000
)

# Categorical expectations
validator.expect_column_values_to_be_in_set(
    "status",
    ["pending", "confirmed", "shipped", "delivered", "cancelled"]
)

# Freshness expectation
validator.expect_column_max_to_be_between(
    "order_date",
    min_value={"$PARAMETER": "now - timedelta(days=1)"},
    max_value={"$PARAMETER": "now"}
)

# Referential integrity
validator.expect_column_values_to_be_in_set(
    "customer_id",
    value_set={"$PARAMETER": "valid_customer_ids"}
)

validator.save_expectation_suite(discard_failed_expectations=False)
```

#### Step 3: Create Data Quality Checks with dbt

```yaml
# models/marts/schema.yml
version: 2

models:
  - name: "fct-orders"
    description: "Order fact table with data quality checks"

    tests:
      # Row count check
      - dbt_utils.equal_rowcount:
          compare_model: ref('stg_orders')

      # Freshness check
      - dbt_utils.recency:
          datepart: hour
          field: created_at
          interval: 24

    columns:
      - name: "order-id"
        description: "Unique order identifier"
        tests:
          - unique
          - not_null
          - relationships:
              to: ref('dim_orders')
              field: order_id

      - name: "total-amount"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000
              inclusive: true
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              row_condition: "status != 'cancelled'"

      - name: "customer-id"
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
              severity: warn
```

#### Step 4: Implement Data Contracts

```yaml
# contracts/orders_contract.yaml
contract:
  name: "orders-data-contract"
  version: "1.0.0"
  owner: [email protected]

schema:
  type: object
  properties:
    order_id:
      type: string
      format: uuid
      description: "Unique order identifier"
    customer_id:
      type: string
      not_null: true
    order_date:
      type: date
      not_null: true
    total_amount:
      type: decimal
      precision: 10
      scale: 2
      minimum: 0
    status:
      type: string
      enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]

sla:
  freshness:
    max_delay_hours: 1
  completeness:
    min_percentage: 99.9
  accuracy:
    duplicate_tolerance: 0.01

consumers:
  - name: "analytics-team"
    usage: "Daily reporting dashboards"
  - name: "ml-team"
    usage: "Churn prediction model"
```

#### Step 5: Set Up Quality Monitoring Dashboard

```python
# monitoring/quality_dashboard.py
from datetime import datetime, timedelta
import pandas as pd

def generate_quality_report(connection, table_name: "str-dict"
    """Generate comprehensive data quality report."""

    report = {
        "table": table_name,
        "timestamp": datetime.now().isoformat(),
        "checks": {}
    }

    # Row count check
    row_count = connection.execute(
        f"SELECT COUNT(*) FROM {table_name}"
    ).fetchone()[0]
    report["checks"]["row_count"] = {
        "value": row_count,
        "status": "pass" if row_count > 0 else "fail"
    }

    # Freshness check
    max_date = connection.execute(
        f"SELECT MAX(created_at) FROM {table_name}"
    ).fetchone()[0]
    hours_old = (datetime.now() - max_date).total_seconds() / 3600
    report["checks"]["freshness"] = {
        "max_timestamp": max_date.isoformat(),
        "hours_old": round(hours_old, 2),
        "status": "pass" if hours_old < 24 else "fail"
    }

    # Null rate check
    null_query = f"""
    SELECT
        SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) as null_order_id,
        SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as null_customer_id,
        COUNT(*) as total
    FROM {table_name}
    """
    null_result = connection.execute(null_query).fetchone()
    report["checks"]["null_rates"] = {
        "order_id": null_result[0] / null_result[2] if null_result[2] > 0 else 0,
        "customer_id": null_result[1] / null_result[2] if null_result[2] > 0 else 0,
        "status": "pass" if null_result[0] == 0 and null_result[1] == 0 else "fail"
    }

    # Duplicate check
    dup_query = f"""
    SELECT COUNT(*) - COUNT(DISTINCT order_id) as duplicates
    FROM {table_name}
    """
    duplicates = connection.execute(dup_query).fetchone()[0]
    report["checks"]["duplicates"] = {
        "count": duplicates,
        "status": "pass" if duplicates == 0 else "fail"
    }

    # Overall status
    all_passed = all(
        check["status"] == "pass"
        for check in report["checks"].values()
    )
    report["overall_status"] = "pass" if all_passed else "fail"

    return report
```

---

```