Back to skills
SkillHub ClubRun DevOpsFull StackDevOpsTesting

kafka

Apache Kafka on Kubernetes with Strimzi (KRaft mode, no ZooKeeper). This skill should be used when users ask to deploy Kafka clusters, build producers/consumers, implement event-driven patterns, or debug Kafka issues. Includes tested manifests and Makefile for one-command deployment.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
22
Hot score
88
Updated
March 20, 2026
Overall rating
C2.9
Composite score
2.9
Best-practice grade
C64.8

Install command

npx @skill-hub/cli install mjunaidca-mjs-agent-skills-kafka

Repository

mjunaidca/mjs-agent-skills

Skill path: .claude/skills/kafka

Apache Kafka on Kubernetes with Strimzi (KRaft mode, no ZooKeeper). This skill should be used when users ask to deploy Kafka clusters, build producers/consumers, implement event-driven patterns, or debug Kafka issues. Includes tested manifests and Makefile for one-command deployment.

Open repository

Best for

Primary workflow: Run DevOps.

Technical facets: Full Stack, DevOps, Testing.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: mjunaidca.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install kafka into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/mjunaidca/mjs-agent-skills before adding kafka to shared team environments
  • Use kafka for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: kafka
description: |
  Apache Kafka on Kubernetes with Strimzi (KRaft mode, no ZooKeeper).
  This skill should be used when users ask to deploy Kafka clusters, build
  producers/consumers, implement event-driven patterns, or debug Kafka issues.
  Includes tested manifests and Makefile for one-command deployment.
hooks:
  PreToolUse:
    - matcher: "Bash"
      hooks:
        - type: command
          command: "bash \"$CLAUDE_PROJECT_DIR\"/.claude/hooks/verify-kubectl-context.sh"
---

# Apache Kafka

Event streaming for Kubernetes. Strimzi operator, KRaft mode, no ZooKeeper.

## Quick Start (Tested)

```bash
make install    # Deploy Strimzi + Kafka
make test       # Verify everything works
make status     # Show resources
make uninstall  # Clean up
```

**Requirements:** Kubernetes cluster, Helm 3+

**Versions:** Strimzi 0.49+, Kafka 4.1.1

---

## Resource Detection & Adaptation

**Before generating manifests, detect the target environment:**

```bash
# Detect machine memory
sysctl -n hw.memsize 2>/dev/null | awk '{print $0/1024/1024/1024 " GB"}' || \
  grep MemTotal /proc/meminfo | awk '{print $2/1024/1024 " GB"}'

# Detect Docker Desktop allocation
docker info --format '{{.MemTotal}}' 2>/dev/null | awk '{print $0/1024/1024/1024 " GB"}'

# Detect Kubernetes node capacity
kubectl get nodes -o jsonpath='{.items[0].status.capacity.memory}' 2>/dev/null
```

**Adapt resource configuration based on detection:**

| Detected RAM | Profile | Kafka Memory | Action |
|--------------|---------|--------------|--------|
| < 12GB | Minimal | 512Mi-1Gi | Warn user about constraints |
| 12-24GB | Standard | 1Gi-2Gi | Default configuration |
| > 24GB | Production | 4Gi-8Gi | Enable full features |

### Adaptive Resource Templates

**Minimal (detected < 12GB):**
```yaml
resources:
  requests:
    memory: 512Mi
    cpu: 200m
  limits:
    memory: 1Gi
    cpu: 500m
```
⚠️ Agent should warn: "Limited resources detected. Kafka may be unstable under load."

**Standard (detected 12-24GB):**
```yaml
resources:
  requests:
    memory: 1Gi
    cpu: 250m
  limits:
    memory: 2Gi
    cpu: 1000m
```

**Production (detected > 24GB or real cluster):**
```yaml
resources:
  requests:
    memory: 4Gi
    cpu: 1000m
  limits:
    memory: 8Gi
    cpu: 4000m
```

### Agent Behavior

1. **Always detect** before generating manifests
2. **Adapt** resource configs to detected environment
3. **Warn** if resources are insufficient for requested workload
4. **Suggest** Docker Desktop settings if running locally

---

## What This Skill Does

| Task | How |
|------|-----|
| **Analyze coupling** | Identify temporal, availability, behavioral issues |
| **Explain eventual consistency** | Consistency windows, read-your-writes patterns |
| **Design events** | Domain events, CloudEvents, Avro schemas |
| Deploy Kafka | Helm (Strimzi) + kubectl (manifests) |
| Create topics | KafkaTopic CRD |
| Build producers | confluent-kafka-python templates |
| Build consumers | AIOConsumer for FastAPI |
| Debug issues | Runbooks in references/ |

## What This Skill Does NOT Do

- Deploy ZooKeeper (KRaft only)
- Manage Kafka Streams applications
- Configure multi-datacenter replication

---

## Deployment

### Install Strimzi Operator

```bash
helm repo add strimzi https://strimzi.io/charts
helm install strimzi-operator strimzi/strimzi-kafka-operator -n kafka --create-namespace --wait
```

### Deploy Kafka Cluster

```bash
kubectl apply -f manifests/kafka-cluster.yaml -n kafka
kubectl wait kafka/dev-cluster --for=condition=Ready --timeout=300s -n kafka
```

### Create Topic

```bash
kubectl apply -f manifests/kafka-topic.yaml -n kafka
```

### Verify

```bash
kubectl get kafka,kafkatopic,pods -n kafka
```

---

## Core Concepts

```
Topic      = Named stream (like a database table)
Partition  = Ordered log within topic (parallelism unit)
Consumer Group = Consumers sharing work (partition → one consumer)
Offset     = Consumer position (commit to track progress)
Broker     = Kafka server
Controller = Metadata manager (KRaft replaces ZooKeeper)
```

---

## Local Development

Connect from your host machine (no port-forward needed):

```python
# From your local machine (outside Kubernetes)
producer = Producer({'bootstrap.servers': 'localhost:30092'})
```

Connect from inside Kubernetes (pod-to-pod):

```python
# From another pod in the cluster
producer = Producer({'bootstrap.servers': 'dev-cluster-kafka-bootstrap.kafka:9092'})
```

| Location | Bootstrap Server |
|----------|------------------|
| Local machine | `localhost:30092` |
| Same namespace | `dev-cluster-kafka-bootstrap:9092` |
| Different namespace | `dev-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092` |

---

## Producer/Consumer (Python)

```python
from confluent_kafka import Producer, Consumer

# Producer (production config)
producer = Producer({
    'bootstrap.servers': 'localhost:30092',  # Or K8s service for pods
    'acks': 'all',
    'enable.idempotence': True,
})
producer.produce('my-topic', key='key', value='message')
producer.flush()

# Consumer
consumer = Consumer({
    'bootstrap.servers': 'localhost:30092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
})
consumer.subscribe(['my-topic'])
msg = consumer.poll(1.0)
```

See `assets/templates/producer-consumer.py` for async FastAPI integration.

---

## Debugging

```bash
# Check consumer lag
kubectl exec -n kafka dev-cluster-dual-role-0 -- \
  bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group <group-name>

# List topics
kubectl exec -n kafka dev-cluster-dual-role-0 -- \
  bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

# Describe topic
kubectl exec -n kafka dev-cluster-dual-role-0 -- \
  bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic <topic-name>
```

See `references/debugging-runbooks.md` for detailed troubleshooting.

---

## Delivery Semantics

| Guarantee | Config | Use When |
|-----------|--------|----------|
| At-most-once | `acks=0` | Metrics, logs (may lose) |
| At-least-once | `acks=all` + manual commit | Most cases (may duplicate) |
| Exactly-once | Transactions | Financial (higher latency) |

**Default:** At-least-once with idempotent consumers.

---

## File Structure

```
kafka/
├── Makefile                 # Tested deployment commands
├── manifests/
│   ├── kafka-cluster.yaml   # KRaft cluster (tested)
│   └── kafka-topic.yaml     # Topic CRD
├── assets/templates/
│   └── producer-consumer.py # Python async templates
└── references/              # Deep knowledge
    ├── core-concepts.md
    ├── producers.md
    ├── consumers.md
    ├── debugging-runbooks.md
    ├── gotchas.md
    └── ... (15 files)
```

---

## Architecture Analysis

When analyzing synchronous architectures for coupling:

```
Scenario: Service A calls B, C, D directly (500ms each)

Temporal Coupling?
└── Does caller wait for all responses? → YES = coupled

Availability Coupling?
└── If B is down, does A fail? → YES = coupled

Behavioral Coupling?
└── Does A import B, C, D clients? → YES = coupled
```

**Solution:** Publish domain event, services consume independently.

See `references/architecture-patterns.md` for detailed analysis templates.

---

## References

| File | When to Read |
|------|--------------|
| `references/architecture-patterns.md` | **Coupling analysis, eventual consistency, when to use Kafka** |
| `references/agent-event-patterns.md` | **AI agent coordination, correlation IDs, fanout** |
| `references/strimzi-deployment.md` | KRaft mode, CRDs, storage sizing |
| `references/producers.md` | Producer configuration, batching, tuning |
| `references/consumers.md` | Consumer groups, commits |
| `references/delivery-semantics.md` | At-most/least/exactly-once decision tree |
| `references/outbox-pattern.md` | Transactional outbox with Debezium CDC |
| `references/debugging-runbooks.md` | Lag, rebalancing issues |
| `references/monitoring.md` | Prometheus, alerts, Grafana |
| `references/gotchas.md` | Common mistakes |
| `references/security-patterns.md` | SCRAM, mTLS |

---

## Related Skills

| Skill | Use For |
|-------|---------|
| `/kubernetes` | Cluster operations |
| `/helm` | Chart customization |
| `/docker` | Local development |


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### manifests/kafka-cluster.yaml

```yaml
# Kafka Cluster - KRaft Mode (No ZooKeeper)
# Strimzi 0.49+ with Kafka 4.x
#
# Resource Profile: MINIMAL (8GB Mac/PC)
# - Docker Desktop: 5GB RAM, 4 CPUs, 1GB swap
# - For 16GB+ machines, see SKILL.md for Standard profile
#
# Usage:
#   kubectl apply -f kafka-cluster.yaml -n kafka
#
# Verify:
#   kubectl get kafka dev-cluster -n kafka
#   kubectl wait kafka/dev-cluster --for=condition=Ready --timeout=300s -n kafka

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
  labels:
    strimzi.io/cluster: dev-cluster
spec:
  replicas: 1
  roles:
    - controller
    - broker
  storage:
    type: ephemeral
  resources:
    requests:
      memory: 512Mi    # Minimal profile for 8GB machines
      cpu: 200m
    limits:
      memory: 1Gi      # Increase to 2Gi for 16GB+ machines
      cpu: 500m

---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: dev-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 4.1.1
    metadataVersion: 4.1-IV0
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      # Local development: connect from host machine without port-forward
      - name: external
        port: 9094
        type: nodeport
        tls: false
        configuration:
          bootstrap:
            nodePort: 30092
          brokers:
            - broker: 0
              nodePort: 30093
              advertisedHost: localhost
              advertisedPort: 30093
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
      auto.create.topics.enable: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

```

### manifests/kafka-topic.yaml

```yaml
# Kafka Topic - Test
#
# Usage:
#   kubectl apply -f kafka-topic.yaml -n kafka

apiVersion: kafka.strimzi.io/v1
kind: KafkaTopic
metadata:
  name: test-topic
  labels:
    strimzi.io/cluster: dev-cluster
spec:
  partitions: 3
  replicas: 1
  config:
    retention.ms: "3600000"

```

### assets/templates/producer-consumer.py

```python
"""
Kafka Producer/Consumer Template for FastAPI

Usage:
    1. Copy relevant classes to your project
    2. Configure with your settings
    3. Use with FastAPI lifespan
"""

import asyncio
import json
import logging
from typing import Any, Callable, Awaitable, Optional
from dataclasses import dataclass, field
from datetime import datetime
from uuid import uuid4

from confluent_kafka import Producer, Consumer, KafkaError

logger = logging.getLogger(__name__)


# =============================================================================
# Configuration
# =============================================================================

@dataclass
class KafkaConfig:
    """Kafka connection configuration."""
    bootstrap_servers: str = "kafka:9092"
    security_protocol: str = "PLAINTEXT"  # PLAINTEXT, SASL_SSL, SSL
    sasl_mechanism: Optional[str] = None  # SCRAM-SHA-512
    sasl_username: Optional[str] = None
    sasl_password: Optional[str] = None
    ssl_ca_location: Optional[str] = None
    ssl_certificate_location: Optional[str] = None
    ssl_key_location: Optional[str] = None

    def to_dict(self) -> dict:
        config = {"bootstrap.servers": self.bootstrap_servers}

        if self.security_protocol != "PLAINTEXT":
            config["security.protocol"] = self.security_protocol

        if self.sasl_mechanism:
            config["sasl.mechanism"] = self.sasl_mechanism
            config["sasl.username"] = self.sasl_username
            config["sasl.password"] = self.sasl_password

        if self.ssl_ca_location:
            config["ssl.ca.location"] = self.ssl_ca_location
        if self.ssl_certificate_location:
            config["ssl.certificate.location"] = self.ssl_certificate_location
        if self.ssl_key_location:
            config["ssl.key.location"] = self.ssl_key_location

        return config


# =============================================================================
# CloudEvents Envelope
# =============================================================================

@dataclass
class CloudEvent:
    """CloudEvents 1.0 envelope."""
    type: str
    source: str
    data: Any

    id: str = field(default_factory=lambda: str(uuid4()))
    specversion: str = "1.0"
    time: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
    subject: Optional[str] = None
    datacontenttype: str = "application/json"

    def to_headers(self) -> list[tuple[str, bytes]]:
        headers = [
            ("ce_specversion", self.specversion.encode()),
            ("ce_id", self.id.encode()),
            ("ce_source", self.source.encode()),
            ("ce_type", self.type.encode()),
            ("ce_time", self.time.encode()),
            ("ce_datacontenttype", self.datacontenttype.encode()),
        ]
        if self.subject:
            headers.append(("ce_subject", self.subject.encode()))
        return headers

    def to_value(self) -> bytes:
        return json.dumps(self.data).encode()


# =============================================================================
# Async Producer
# =============================================================================

class AIOProducer:
    """Async Kafka producer for FastAPI."""

    def __init__(self, config: KafkaConfig):
        producer_config = config.to_dict()
        producer_config.update({
            "acks": "all",
            "enable.idempotence": True,
            "retries": 2147483647,
            "delivery.timeout.ms": 120000,
        })
        self._producer = Producer(producer_config)
        self._loop = asyncio.get_event_loop()

    async def produce(
        self,
        topic: str,
        value: bytes,
        key: Optional[str] = None,
        headers: Optional[list] = None,
    ) -> dict:
        """Produce message with async delivery confirmation."""
        future = self._loop.create_future()

        def callback(err, msg):
            if err:
                self._loop.call_soon_threadsafe(
                    future.set_exception,
                    Exception(f"Delivery failed: {err}")
                )
            else:
                self._loop.call_soon_threadsafe(
                    future.set_result,
                    {
                        "topic": msg.topic(),
                        "partition": msg.partition(),
                        "offset": msg.offset(),
                    }
                )

        self._producer.produce(
            topic=topic,
            value=value,
            key=key.encode() if key else None,
            headers=headers,
            callback=callback,
        )
        self._producer.poll(0)

        return await future

    async def produce_event(self, topic: str, event: CloudEvent) -> dict:
        """Produce CloudEvents-formatted message."""
        return await self.produce(
            topic=topic,
            value=event.to_value(),
            key=event.subject,
            headers=event.to_headers(),
        )

    async def flush(self, timeout: float = 10.0):
        """Async-friendly flush."""
        while True:
            remaining = self._producer.flush(timeout=0.1)
            if remaining == 0:
                break
            await asyncio.sleep(0.01)

    def close(self):
        self._producer.flush()


# =============================================================================
# Async Consumer
# =============================================================================

class AIOConsumer:
    """Async Kafka consumer for FastAPI."""

    def __init__(
        self,
        config: KafkaConfig,
        group_id: str,
        topics: list[str],
    ):
        consumer_config = config.to_dict()
        consumer_config.update({
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
            "session.timeout.ms": 45000,
            "max.poll.interval.ms": 300000,
            "partition.assignment.strategy": "cooperative-sticky",
        })
        self._consumer = Consumer(consumer_config)
        self._consumer.subscribe(topics)
        self._running = False

    async def consume(
        self,
        handler: Callable[[dict, dict], Awaitable[None]],
        poll_timeout: float = 1.0,
    ):
        """Consume messages with async handler."""
        self._running = True
        loop = asyncio.get_event_loop()

        while self._running:
            msg = await loop.run_in_executor(
                None,
                self._consumer.poll,
                poll_timeout,
            )

            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                logger.error(f"Consumer error: {msg.error()}")
                continue

            try:
                # Parse headers
                headers = {
                    k: v.decode() if v else None
                    for k, v in (msg.headers() or [])
                }

                # Parse value
                value = json.loads(msg.value().decode())

                # Call handler
                await handler(value, headers)

                # Commit after success
                self._consumer.commit(asynchronous=False)

            except Exception as e:
                logger.exception(f"Handler error: {e}")
                # TODO: Send to DLQ

    def stop(self):
        self._running = False

    def close(self):
        self._consumer.close()


# =============================================================================
# FastAPI Integration Example
# =============================================================================

"""
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio

producer: AIOProducer | None = None
consumer_task: asyncio.Task | None = None

async def handle_event(value: dict, headers: dict):
    print(f"Received: {value}")

@asynccontextmanager
async def lifespan(app: FastAPI):
    global producer, consumer_task

    config = KafkaConfig(bootstrap_servers="kafka:9092")

    producer = AIOProducer(config)

    consumer = AIOConsumer(
        config=config,
        group_id="my-service",
        topics=["events"],
    )
    consumer_task = asyncio.create_task(consumer.consume(handle_event))

    yield

    consumer.stop()
    if consumer_task:
        consumer_task.cancel()
        try:
            await consumer_task
        except asyncio.CancelledError:
            pass
    await producer.flush()
    consumer.close()

app = FastAPI(lifespan=lifespan)

@app.post("/events")
async def create_event(data: dict):
    event = CloudEvent(
        type="my.event.created",
        source="my-service",
        subject=data.get("id"),
        data=data,
    )
    result = await producer.produce_event("events", event)
    return {"status": "accepted", "kafka": result}
"""

```

### references/debugging-runbooks.md

```markdown
# Debugging Runbooks

Step-by-step guides for common Kafka issues.

---

## Consumer Lag Diagnosis

### Symptoms
- Messages piling up in topic
- Processing delays increasing
- Lag metrics climbing

### Runbook

```bash
# 1. Check consumer group status
kubectl exec -n kafka kafka-cluster-kafka-0 -c kafka -- \
  bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe --group order-processor

# Output shows:
# GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order-processor orders          0          1000            5000            4000
# order-processor orders          1          2000            3000            1000
```

```bash
# 2. Check consumer members
kubectl exec -n kafka kafka-cluster-kafka-0 -c kafka -- \
  bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe --group order-processor --members

# Look for: active consumers, assigned partitions
```

```bash
# 3. Check consumer app logs
kubectl logs -n app -l app=order-processor --tail=100 | grep -i "error\|exception\|slow"
```

```bash
# 4. Check processing time
# If using Prometheus:
# avg(rate(kafka_message_processing_seconds_sum[5m]) / rate(kafka_message_processing_seconds_count[5m]))
```

### Resolution Decision Tree

```
Is consumer connected?
├── No → Check network, auth, bootstrap servers
└── Yes → Check partition assignment
    ├── No partitions → Consumer not in group (check group.id)
    └── Partitions assigned → Check processing
        ├── Processing slow → Optimize code, add consumers
        ├── Processing errors → Check DLQ, fix bugs
        └── Poll interval exceeded → Increase max.poll.interval.ms
```

### Fixes

| Cause | Fix |
|-------|-----|
| Too few consumers | Scale up consumer pods |
| Slow processing | Optimize code, batch processing |
| Errors causing retries | Fix bugs, use DLQ |
| Poll timeout | Increase `max.poll.interval.ms` |
| Network issues | Check connectivity, DNS |

---

## Rebalancing Storm

### Symptoms
- Frequent "Rebalance triggered" logs
- Consumers joining/leaving constantly
- Processing throughput drops

### Runbook

```bash
# 1. Check rebalance frequency
kubectl logs -n app -l app=order-processor | grep -i "rebalance" | tail -20

# 2. Check consumer session timeouts
kubectl logs -n app -l app=order-processor | grep -i "heartbeat\|session"

# 3. Check consumer poll intervals
# If poll() not called within max.poll.interval.ms, consumer kicked out
```

### Common Causes

| Cause | Evidence | Fix |
|-------|----------|-----|
| Long processing | "member timed out" | Increase `max.poll.interval.ms` |
| Frequent restarts | Pod restart count high | Fix crash bugs |
| Network issues | Connection timeouts | Check network policies |
| GC pauses | Long GC logs | Tune JVM / Python GC |
| Too many partitions | Many consumers | Reduce partitions or use sticky |

### Prevention

```python
consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'order-processor',

    # Prevent rebalancing storms
    'session.timeout.ms': 45000,           # 45s (default 10s too short)
    'heartbeat.interval.ms': 15000,        # 15s (1/3 of session timeout)
    'max.poll.interval.ms': 300000,        # 5min for long processing

    # Use cooperative rebalancing
    'partition.assignment.strategy': 'cooperative-sticky',
})
```

---

## Under-Replicated Partitions

### Symptoms
- `kafka_server_ReplicaManager_UnderReplicatedPartitions > 0`
- Alerts firing
- Potential data loss risk

### Runbook

```bash
# 1. Identify under-replicated partitions
kubectl exec -n kafka kafka-cluster-kafka-0 -c kafka -- \
  bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --under-replicated-partitions

# 2. Check broker status
kubectl get pods -n kafka -l strimzi.io/kind=Kafka
kubectl describe pod kafka-cluster-kafka-1 -n kafka

# 3. Check broker logs
kubectl logs -n kafka kafka-cluster-kafka-1 -c kafka --tail=100 | grep -i "error\|exception"

# 4. Check disk usage
kubectl exec -n kafka kafka-cluster-kafka-1 -c kafka -- df -h /var/lib/kafka

# 5. Check network between brokers
kubectl exec -n kafka kafka-cluster-kafka-0 -c kafka -- \
  nc -zv kafka-cluster-kafka-1.kafka-cluster-kafka-brokers 9091
```

### Resolution

| Cause | Fix |
|-------|-----|
| Broker down | Restart pod, check PVC |
| Disk full | Increase PVC, reduce retention |
| Network partition | Check NetworkPolicies |
| Slow followers | Check network, disk I/O |
| Leader election | Wait, check controller logs |

### Recovery

```bash
# Force leader election (if stuck)
kubectl exec -n kafka kafka-cluster-kafka-0 -c kafka -- \
  bin/kafka-leader-election.sh \
  --bootstrap-server localhost:9092 \
  --election-type PREFERRED \
  --all-topic-partitions

# Reassign partitions (if broker permanently lost)
# Create reassignment.json, then:
kubectl exec -n kafka kafka-cluster-kafka-0 -c kafka -- \
  bin/kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/reassignment.json \
  --execute
```

---

## Connection/Auth Issues

### Symptoms
- "Connection refused"
- "SASL authentication failed"
- "SSL handshake failed"

### Runbook

```bash
# 1. Test basic connectivity
kubectl run test-conn --rm -it --image=busybox -n app -- \
  nc -zv kafka-cluster-kafka-bootstrap.kafka 9092

# 2. Check DNS resolution
kubectl run test-dns --rm -it --image=busybox -n app -- \
  nslookup kafka-cluster-kafka-bootstrap.kafka

# 3. Check listener configuration
kubectl get kafka kafka-cluster -n kafka -o jsonpath='{.spec.kafka.listeners}'

# 4. Verify credentials
kubectl get secret order-service -n kafka -o yaml
kubectl get secret kafka-cluster-cluster-ca-cert -n kafka -o yaml

# 5. Test with kafkacat
kubectl run kafkacat --rm -it --image=confluentinc/cp-kafkacat -n app -- \
  kafkacat -b kafka-cluster-kafka-bootstrap:9092 -L
```

### Common Errors

| Error | Cause | Fix |
|-------|-------|-----|
| `Connection refused` | Wrong port, service down | Check listener, port |
| `SASL authentication failed` | Wrong credentials | Verify secret |
| `SSL handshake failed` | Wrong CA, expired cert | Check certificates |
| `Unknown topic` | Topic doesn't exist | Create topic |
| `Not authorized` | Missing ACL | Add ACL permissions |

### Debug SSL

```bash
# Check certificate
openssl s_client -connect kafka-cluster-kafka-bootstrap:9093 \
  -CAfile /certs/ca.crt

# Verify client cert
openssl x509 -in /certs/user.crt -text -noout
```

---

## Message Loss Investigation

### Symptoms
- Messages sent but not received
- Gaps in sequence numbers
- "Fire and forget" without confirmation

### Runbook

```bash
# 1. Check producer acks configuration
# acks=0 or acks=1 can lose messages

# 2. Check topic replication
kubectl exec -n kafka kafka-cluster-kafka-0 -c kafka -- \
  bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders

# Look for: ReplicationFactor, Isr

# 3. Check min.insync.replicas
# If ISR < min.insync.replicas, writes fail

# 4. Check producer errors
kubectl logs -n app -l app=order-producer | grep -i "error\|failed\|timeout"

# 5. Verify messages in topic
kubectl exec -n kafka kafka-cluster-kafka-0 -c kafka -- \
  bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning \
  --max-messages 10
```

### Prevention Checklist

```python
# Safe producer config
producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'acks': 'all',                    # Wait for all replicas
    'enable.idempotence': True,       # Prevent duplicates
    'retries': 2147483647,            # Infinite retries
    'delivery.timeout.ms': 120000,    # 2 min timeout
})

# Always use callback
def delivery_callback(err, msg):
    if err:
        logger.error(f"DELIVERY FAILED: {err}")
        # Alert, retry, or dead letter

producer.produce(topic, value, callback=delivery_callback)
producer.flush()  # Never skip
```

---

## Performance Degradation

### Symptoms
- High latency
- Throughput drop
- Timeouts

### Runbook

```bash
# 1. Check broker CPU/memory
kubectl top pods -n kafka -l strimzi.io/kind=Kafka

# 2. Check disk I/O
kubectl exec -n kafka kafka-cluster-kafka-0 -c kafka -- iostat -x 1 3

# 3. Check network
kubectl exec -n kafka kafka-cluster-kafka-0 -c kafka -- \
  netstat -an | grep -c ESTABLISHED

# 4. Check request queue
# Metric: kafka.network:type=RequestChannel,name=RequestQueueSize

# 5. Check log flush latency
# Metric: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
```

### Tuning

| Issue | Tune |
|-------|------|
| High produce latency | Increase `linger.ms`, use compression |
| High fetch latency | Increase `fetch.min.bytes` |
| Disk bottleneck | Use faster storage, spread partitions |
| Network bottleneck | Compress, use rack awareness |
| Memory pressure | Increase heap, tune buffer sizes |

---

## Quick Reference

```bash
# All topics
kafka-topics.sh --bootstrap-server localhost:9092 --list

# Topic details
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic TOPIC

# Consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# Group lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group GROUP

# Read from beginning
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC --from-beginning

# Read latest
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC

# Produce test message
echo "test" | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic TOPIC
```

```

### references/architecture-patterns.md

```markdown
# Architecture Patterns: When and Why Kafka

## The Coupling Problem

### Synchronous (Direct Call) Architecture

```
┌─────────┐     ┌──────────────┐
│ Task    │────▶│ Notification │  500ms
│ API     │     └──────────────┘
│         │     ┌──────────────┐
│         │────▶│ Audit        │  500ms
│         │     └──────────────┘
│         │     ┌──────────────┐
│         │────▶│ Reminder     │  500ms
└─────────┘     └──────────────┘

Total latency: 1500ms (sequential)
Or: 500ms (parallel) but still coupled
```

### Three Types of Coupling

| Coupling | Problem | Example |
|----------|---------|---------|
| **Temporal** | Caller waits for response | Task API blocks 1500ms total |
| **Availability** | If callee down, caller fails | Notification down = Task fails |
| **Behavioral** | Caller knows callee details | Task API imports all 3 clients |

### Analysis Questions

When analyzing a synchronous architecture, ask:

1. **Temporal**: "What happens if Service B takes 30 seconds?"
   - Caller times out
   - User sees error
   - Retry storm begins

2. **Availability**: "What happens if Service B is down?"
   - Caller fails entirely
   - Or: Caller must implement circuit breaker
   - Cascading failures possible

3. **Behavioral**: "What happens if we add Service D?"
   - Caller code must change
   - New dependency added
   - Deployment coupling

---

## Event-Driven Architecture (Kafka Solution)

```
┌─────────┐     ┌───────┐     ┌──────────────┐
│ Task    │────▶│ Kafka │────▶│ Notification │
│ API     │     │       │────▶│ Audit        │
│         │     │       │────▶│ Reminder     │
└─────────┘     └───────┘     └──────────────┘

Task API latency: ~10ms (just publish)
Services process asynchronously
```

### Coupling Comparison

| Coupling | Synchronous | Event-Driven |
|----------|-------------|--------------|
| **Temporal** | Waits 1500ms | Returns in 10ms |
| **Availability** | Fails if any down | Succeeds, events queued |
| **Behavioral** | Knows all services | Knows only event schema |

---

## Pattern: Publish Domain Events

### Before (Synchronous)

```python
# task_api.py - COUPLED
from notification_client import NotificationClient
from audit_client import AuditClient
from reminder_client import ReminderClient

async def create_task(task: Task):
    # Save task
    saved = await db.save(task)

    # Direct calls - TEMPORAL COUPLING
    await notification_client.notify(task.assignee, "New task")  # 500ms
    await audit_client.log("task_created", task.id)              # 500ms
    await reminder_client.schedule(task.due_date, task.id)       # 500ms

    return saved  # Total: 1500ms+
```

### After (Event-Driven)

```python
# task_api.py - DECOUPLED
from kafka_producer import producer

async def create_task(task: Task):
    # Save task
    saved = await db.save(task)

    # Publish event - NO COUPLING
    await producer.send("task.events", {
        "type": "task.created",
        "data": {"task_id": task.id, "assignee": task.assignee, "due": task.due_date}
    })

    return saved  # Total: ~10ms

# notification_service.py - INDEPENDENT
async def handle_event(event):
    if event["type"] == "task.created":
        await notify(event["data"]["assignee"], "New task")

# audit_service.py - INDEPENDENT
async def handle_event(event):
    if event["type"] == "task.created":
        await log("task_created", event["data"]["task_id"])

# reminder_service.py - INDEPENDENT
async def handle_event(event):
    if event["type"] == "task.created":
        await schedule(event["data"]["due"], event["data"]["task_id"])
```

---

## When to Use Each Pattern

### Use Synchronous When:

- Response needed immediately (user waiting)
- Transaction must be atomic
- Simple, few dependencies
- Low latency requirement (<100ms)

### Use Event-Driven When:

- Response can be eventual
- Multiple consumers need same data
- Services should be independent
- Failure of one shouldn't fail all
- Adding new consumers shouldn't change producer

---

## Eventual Consistency

**The tradeoff:** Event-driven systems are eventually consistent, not immediately consistent.

```
Strong Consistency          Eventual Consistency
─────────────────          ────────────────────
Write → Read = latest      Write → Read = maybe stale
Slower (waits for acks)    Faster (async processing)
Simpler mental model       Requires handling lag
```

### Consistency Windows by Domain

| Domain | Acceptable Window | Example |
|--------|------------------|---------|
| E-commerce checkout | 200-500ms | Order placed → confirmation email |
| Inventory display | 1-5 seconds | Stock reserved → UI updated |
| Search indexing | 5-30 seconds | Product created → searchable |
| Analytics dashboard | 1-5 minutes | Event → metric updated |
| Reporting | Hours | Daily aggregation jobs |

### Read-Your-Writes Pattern

When users must see their own changes immediately:

```python
# Pattern 1: Return entity directly (optimistic UI)
@app.post("/orders")
async def create_order(order: Order):
    saved = await db.save(order)
    await producer.send("order.created", saved)
    return saved  # User sees order immediately, processing async

# Pattern 2: Local cache for consistency gap
@app.get("/orders/{id}")
async def get_order(id: str, user_id: str):
    order = await db.get(id)
    if not order:
        # Check user's pending orders (covers consistency gap)
        order = await pending_cache.get(f"pending:{user_id}:{id}")
    return order

# Pattern 3: Include status indicating eventual state
@app.post("/orders")
async def create_order(order: Order):
    saved = await db.save(order)
    await producer.send("order.created", saved)
    return {"order": saved, "status": "processing"}  # Explicit async status
```

### Handling "Where's My Order?" Scenarios

| User Action | System State | Solution |
|-------------|--------------|----------|
| Just placed order, refreshes page | Event not yet processed | Return from write path, show "processing" |
| Checks order after 1 min | Should be consistent | Normal read path |
| Order confirmation email delayed | Email consumer lagging | Show status in UI, email is secondary |

---

## Anti-Patterns to Identify

### 1. Fan-Out Synchronous Calls

```python
# BAD: N services = N * latency
for service in [svc1, svc2, svc3, svc4]:
    await service.call(data)  # 500ms each = 2000ms
```

**Fix:** Publish one event, N consumers process independently.

### 2. Request-Reply Over Events

```python
# BAD: Using Kafka like HTTP
await producer.send("request-topic", request)
response = await consumer.wait_for_response()  # Defeats the purpose
```

**Fix:** If you need sync response, use HTTP. Events are for async.

### 3. Distributed Monolith

```python
# BAD: Events but still coupled
await producer.send("notification-service.notify", {...})
await producer.send("audit-service.log", {...})
```

**Fix:** Publish domain events, not commands to specific services.

---

## Scenario Analysis Template

When analyzing architecture for coupling:

```markdown
## Current State
- Service A calls: [list services]
- Call pattern: [sync/async]
- Latency per call: [Xms]
- Total latency: [Xms]

## Coupling Analysis
- Temporal: [Yes/No] - [explanation]
- Availability: [Yes/No] - [explanation]
- Behavioral: [Yes/No] - [explanation]

## Event-Driven Solution
- Event type: [domain.event.name]
- Producer: [which service]
- Consumers: [list services]
- Expected latency: [Xms]

## Trade-offs
- Gains: [decoupling benefits]
- Costs: [complexity, eventual consistency]
```

---

## Real Scenario: Task API → 3 Services

### Input
> Task API calls Notification, Audit, Reminder directly. Each takes 500ms.

### Analysis

**Temporal Coupling: YES**
- Task API waits 1500ms (sequential) or 500ms (parallel)
- User experiences slow response
- If any call slow, entire request slow

**Availability Coupling: YES**
- If Notification service down → Task creation fails
- If Audit service down → Task creation fails
- Single point of failure × 3

**Behavioral Coupling: YES**
- Task API imports NotificationClient, AuditClient, ReminderClient
- Adding LoggingService requires Task API code change
- Removing a service requires Task API code change

### Recommendation

Publish `task.created` event to Kafka:

```python
# Task API (producer)
await producer.send("task.events", {
    "type": "task.created",
    "task_id": task.id,
    "assignee": task.assignee,
    "due_date": task.due_date
})
# Returns in ~10ms

# Each service subscribes independently
# consumer_group: notification-service → topic: task.events
# consumer_group: audit-service → topic: task.events
# consumer_group: reminder-service → topic: task.events
```

**Result:**
- Temporal: Task API returns in ~10ms
- Availability: Task succeeds even if services down (events queued)
- Behavioral: Adding new consumer = zero changes to Task API

```

### references/agent-event-patterns.md

```markdown
# Agent Event Patterns

Event-driven patterns for AI agent coordination and task dispatch.

---

## Event Naming Convention

Use `domain.action` pattern in **past tense** (immutable facts):

```
task.created      ✅ (fact: task was created)
task.create       ❌ (command: create a task)

agent.assigned    ✅
agent.completed   ✅
notification.sent ✅
```

**Past tense indicates immutable history, not imperative commands.**

---

## Standard Event Schema

```python
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Optional
import uuid

@dataclass
class EventMetadata:
    """Metadata for distributed tracing and compliance."""
    correlation_id: str    # Traces request across services
    causation_id: str      # What event caused this event
    source: str            # Service that produced the event

@dataclass
class DomainEvent:
    """Base structure for all domain events."""
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    event_type: str = ""
    occurred_at: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    data: dict = field(default_factory=dict)
    metadata: Optional[EventMetadata] = None

    def to_dict(self) -> dict:
        return {
            "event_id": self.event_id,
            "event_type": self.event_type,
            "occurred_at": self.occurred_at,
            "data": self.data,
            "metadata": {
                "correlation_id": self.metadata.correlation_id,
                "causation_id": self.metadata.causation_id,
                "source": self.metadata.source
            } if self.metadata else {}
        }
```

**Example output:**
```json
{
  "event_id": "e7c5a8f2-3b4d-4e6a-9f1c-2d8e7a6b5c4d",
  "event_type": "task.created",
  "occurred_at": "2025-01-15T14:30:22.456Z",
  "data": {
    "task_id": "task-123",
    "title": "Review quarterly report",
    "owner_id": "user-456",
    "priority": 2
  },
  "metadata": {
    "correlation_id": "req-abc-123",
    "causation_id": "api-call-789",
    "source": "task-api"
  }
}
```

---

## Correlation ID vs Causation ID

| Field | Purpose | Scope |
|-------|---------|-------|
| **correlation_id** | Traces single user request across ALL services | Entire workflow |
| **causation_id** | Links event to what directly caused it | Parent-child |

**Example chain:**
```
User Request (correlation_id: req-123)
  └── API creates task (causation_id: api-call-1)
        └── task.created event
              └── Notification service (causation_id: evt-task-created-1)
                    └── notification.sent event
```

All events share `correlation_id: req-123` for end-to-end tracing.

---

## Agent Fanout Architecture

Multiple specialized agents consume the same events independently:

```
                    ┌──────────────────────┐
                    │   task.created       │
                    │   (single topic)     │
                    └──────────┬───────────┘
                               │
        ┌──────────────────────┼──────────────────────┐
        │                      │                      │
        ▼                      ▼                      ▼
┌───────────────┐    ┌───────────────┐    ┌───────────────┐
│ Email Agent   │    │ Slack Agent   │    │ Audit Agent   │
│ group.id:     │    │ group.id:     │    │ group.id:     │
│ notif-email   │    │ notif-slack   │    │ audit-log     │
└───────────────┘    └───────────────┘    └───────────────┘
        │                      │                      │
        ▼                      ▼                      ▼
   Send email            Post message           Write to log
```

**Key:** Different `group.id` = each agent gets ALL messages independently.

---

## Producer: FastAPI with Correlation ID

```python
import json
import uuid
import os
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from confluent_kafka import Producer
from fastapi import FastAPI, Request
from pydantic import BaseModel

producer: Producer = None

def delivery_callback(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()}[{msg.partition()}]")

@asynccontextmanager
async def lifespan(app: FastAPI):
    global producer
    producer = Producer({
        'bootstrap.servers': os.environ.get(
            'KAFKA_BOOTSTRAP_SERVERS', 'localhost:30092'
        ),
        'client.id': 'task-api',
        'acks': 'all',
        'enable.idempotence': True
    })
    yield
    producer.flush()

app = FastAPI(lifespan=lifespan)

class TaskCreate(BaseModel):
    title: str
    owner_id: str
    priority: int = 1

def publish_event(
    event_type: str,
    data: dict,
    correlation_id: str,
    causation_id: str
):
    """Publish domain event with tracing metadata."""
    event = {
        "event_id": str(uuid.uuid4()),
        "event_type": event_type,
        "occurred_at": datetime.now(timezone.utc).isoformat(),
        "data": data,
        "metadata": {
            "correlation_id": correlation_id,
            "causation_id": causation_id,
            "source": "task-api"
        }
    }
    producer.produce(
        topic='task-events',
        key=data.get('task_id', str(uuid.uuid4())),
        value=json.dumps(event),
        callback=delivery_callback
    )
    producer.poll(0)

@app.post("/tasks")
async def create_task(task: TaskCreate, request: Request):
    # Extract or generate correlation ID
    correlation_id = request.headers.get(
        'X-Correlation-ID', str(uuid.uuid4())
    )
    task_id = str(uuid.uuid4())

    task_data = {
        "task_id": task_id,
        "title": task.title,
        "owner_id": task.owner_id,
        "priority": task.priority
    }

    publish_event(
        event_type="task.created",
        data=task_data,
        correlation_id=correlation_id,
        causation_id=f"api-create-{task_id}"
    )

    return {"id": task_id, "status": "created"}
```

---

## Consumer: Notification Agent

```python
import json
import os
from confluent_kafka import Consumer, KafkaError

def create_notification_consumer(notification_type: str):
    """Create consumer for a specific notification channel."""
    return Consumer({
        'bootstrap.servers': os.environ.get(
            'KAFKA_BOOTSTRAP_SERVERS', 'localhost:30092'
        ),
        'group.id': f'notification-{notification_type}',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False
    })

def send_email(to: str, subject: str, body: str):
    print(f"EMAIL to {to}: {subject}")

def process_email_notifications():
    consumer = create_notification_consumer('email')
    consumer.subscribe(['task-events'])

    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                print(f"Error: {msg.error()}")
                continue

            event = json.loads(msg.value().decode())

            if event.get('event_type') == 'task.created':
                data = event.get('data', {})
                metadata = event.get('metadata', {})

                send_email(
                    to=f"{data.get('owner_id')}@company.com",
                    subject=f"New task: {data.get('title')}",
                    body=f"Task ID: {data.get('task_id')}\n"
                         f"Correlation: {metadata.get('correlation_id')}"
                )

            consumer.commit(message=msg)
    finally:
        consumer.close()
```

---

## Consumer: Audit Agent

```python
import json
import os
from datetime import datetime
from confluent_kafka import Consumer, KafkaError

class AuditLogger:
    """Append-only audit log for compliance."""

    def __init__(self, log_dir: str = "/var/log/audit"):
        self.log_dir = log_dir
        os.makedirs(log_dir, exist_ok=True)

    def append(self, event: dict):
        """Append event to immutable log."""
        date_str = datetime.utcnow().strftime("%Y-%m-%d")
        log_file = os.path.join(self.log_dir, f"audit-{date_str}.jsonl")

        log_entry = {
            "logged_at": datetime.utcnow().isoformat() + "Z",
            "event": event
        }

        with open(log_file, 'a') as f:
            f.write(json.dumps(log_entry) + '\n')

def run_audit_agent():
    consumer = Consumer({
        'bootstrap.servers': os.environ.get(
            'KAFKA_BOOTSTRAP_SERVERS', 'localhost:30092'
        ),
        'group.id': 'audit-log',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False
    })
    consumer.subscribe(['task-events'])
    audit = AuditLogger()

    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                continue

            event = json.loads(msg.value().decode())
            audit.append(event)
            consumer.commit(message=msg)
    finally:
        consumer.close()
```

---

## Agent Coordination Topics

| Topic | Purpose | Consumers |
|-------|---------|-----------|
| `task-events` | Task lifecycle events | All task-aware agents |
| `agent-tasks` | Work dispatch to agents | Worker agents |
| `agent-results` | Agent completion reports | Orchestrator |
| `audit-events` | All events for compliance | Audit service |

---

## Event Types for Agent Workflows

```
# Task lifecycle
task.created
task.assigned
task.started
task.completed
task.failed

# Agent lifecycle
agent.registered
agent.heartbeat
agent.task.accepted
agent.task.completed
agent.task.failed

# Orchestration
workflow.started
workflow.step.completed
workflow.completed
workflow.failed
```

---

## Best Practices

| Practice | Why |
|----------|-----|
| Always include `correlation_id` | End-to-end request tracing |
| Use past tense event names | Events are facts, not commands |
| Separate consumer groups per agent | Independent scaling and replay |
| Include `source` in metadata | Know which service produced event |
| Make consumers idempotent | Handle redelivery safely |

```

### references/strimzi-deployment.md

```markdown
# Strimzi Deployment

Deploying Apache Kafka on Kubernetes with Strimzi in KRaft mode.

---

## Prerequisites

- Kubernetes 1.27+
- Helm 3.x
- `kubectl` configured

---

## Quick Start

```bash
# 1. Install Strimzi operator
helm repo add strimzi https://strimzi.io/charts
helm install strimzi-operator strimzi/strimzi-kafka-operator \
  -n kafka --create-namespace

# 2. Wait for operator
kubectl wait --for=condition=Ready pod -l name=strimzi-cluster-operator -n kafka --timeout=300s

# 3. Deploy Kafka cluster
kubectl apply -f kafka-cluster.yaml -n kafka

# 4. Wait for cluster
kubectl wait kafka/my-cluster --for=condition=Ready -n kafka --timeout=600s
```

---

## Kafka CRD (KRaft Mode)

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    version: 4.1.1
    # KRaft mode - no ZooKeeper
    metadataVersion: 4.1-IV0

    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls

    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2

  # Node pools for role separation
  # See KafkaNodePool below

  entityOperator:
    topicOperator: {}
    userOperator: {}
```

---

## KafkaNodePool (Role Separation)

### Controllers (Metadata)

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: controllers
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - controller
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        class: standard
  resources:
    requests:
      memory: 2Gi
      cpu: 500m
    limits:
      memory: 4Gi
      cpu: 1
```

### Brokers (Data)

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: brokers
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        class: fast-ssd
  resources:
    requests:
      memory: 4Gi
      cpu: 1
    limits:
      memory: 8Gi
      cpu: 2
```

### Combined (Dev/Small)

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles:
    - controller
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 50Gi
```

---

## KafkaTopic CRD

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: order-events
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 12
  replicas: 3
  config:
    retention.ms: 604800000        # 7 days
    cleanup.policy: delete
    min.insync.replicas: 2
    segment.bytes: 1073741824      # 1GB segments
```

### Topic Naming

```yaml
# Pattern: <domain>.<entity>.<event>
metadata:
  name: orders.order.created
  name: payments.payment.completed
  name: agents.task.assigned
```

---

## KafkaUser CRD

### SCRAM Authentication

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: order-service
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      # Produce to order topics
      - resource:
          type: topic
          name: orders.
          patternType: prefix
        operations:
          - Write
          - Describe
      # Consume from order topics
      - resource:
          type: topic
          name: orders.
          patternType: prefix
        operations:
          - Read
          - Describe
      # Consumer group
      - resource:
          type: group
          name: order-service
          patternType: literal
        operations:
          - Read
```

### mTLS Authentication

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: payment-service
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: payments.
          patternType: prefix
        operations:
          - All
```

---

## Environment Configurations

### Development

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: dev-cluster
spec:
  kafka:
    version: 4.1.1
    metadataVersion: 4.1-IV0
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false  # No TLS for dev
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      default.replication.factor: 1
      min.insync.replicas: 1
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dev-pool
  labels:
    strimzi.io/cluster: dev-cluster
spec:
  replicas: 1  # Single node
  roles:
    - controller
    - broker
  storage:
    type: ephemeral  # No persistence
  resources:
    requests:
      memory: 1Gi
      cpu: 250m
```

### Production

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: prod-cluster
spec:
  kafka:
    version: 4.1.1
    metadataVersion: 4.1-IV0
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: loadbalancer
        tls: true
        authentication:
          type: scram-sha-512
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      default.replication.factor: 3
      min.insync.replicas: 2
      log.retention.hours: 168  # 7 days
      auto.create.topics.enable: false
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: controllers
  labels:
    strimzi.io/cluster: prod-cluster
spec:
  replicas: 3
  roles:
    - controller
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 20Gi
        class: fast-ssd
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: brokers
  labels:
    strimzi.io/cluster: prod-cluster
spec:
  replicas: 5
  roles:
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 500Gi
        class: fast-ssd
  resources:
    requests:
      memory: 8Gi
      cpu: 2
    limits:
      memory: 16Gi
      cpu: 4
```

---

## Storage Sizing

### Formula

```
Storage per broker = (daily_bytes × retention_days × replication_factor) / num_brokers
```

### Example Calculation

| Parameter | Value |
|-----------|-------|
| Daily message volume | 100 GB/day |
| Retention period | 7 days |
| Replication factor | 3 |
| Number of brokers | 5 |

```
Storage = (100 GB × 7 days × 3) / 5 brokers = 420 GB per broker
```

**Add 20% buffer:**
```
Recommended: 420 GB × 1.2 = 504 GB → Round to 500 GB
```

### Storage Class Selection

| Workload | Storage Class | Why |
|----------|---------------|-----|
| Dev/Test | standard | Cost-effective, ephemeral OK |
| Production (low) | gp3 (AWS) / pd-balanced (GCP) | Balanced price/performance |
| Production (high) | io2 (AWS) / pd-ssd (GCP) | High IOPS for throughput |
| Edge/local | local-path | Direct disk, lowest latency |

### JBOD Configuration

For high-throughput clusters, use multiple disks:

```yaml
storage:
  type: jbod
  volumes:
    - id: 0
      type: persistent-claim
      size: 500Gi
      class: fast-ssd
    - id: 1
      type: persistent-claim
      size: 500Gi
      class: fast-ssd
```

**Benefit:** Kafka stripes partitions across volumes for parallel I/O.

---

## Schema Registry

Strimzi doesn't include Schema Registry. Deploy separately:

```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: schema-registry
  namespace: kafka
spec:
  replicas: 2
  selector:
    matchLabels:
      app: schema-registry
  template:
    metadata:
      labels:
        app: schema-registry
    spec:
      containers:
        - name: schema-registry
          image: confluentinc/cp-schema-registry:7.5.0
          ports:
            - containerPort: 8081
          env:
            - name: SCHEMA_REGISTRY_HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
              value: my-cluster-kafka-bootstrap:9092
            - name: SCHEMA_REGISTRY_LISTENERS
              value: http://0.0.0.0:8081
          resources:
            requests:
              memory: 512Mi
              cpu: 250m
            limits:
              memory: 1Gi
              cpu: 500m
---
apiVersion: v1
kind: Service
metadata:
  name: schema-registry
  namespace: kafka
spec:
  ports:
    - port: 8081
  selector:
    app: schema-registry
```

---

## Useful Commands

```bash
# Cluster status
kubectl get kafka -n kafka
kubectl get kafkanodepools -n kafka
kubectl describe kafka my-cluster -n kafka

# Topics
kubectl get kafkatopics -n kafka
kubectl describe kafkatopic order-events -n kafka

# Users
kubectl get kafkausers -n kafka
kubectl get secret order-service -n kafka -o jsonpath='{.data.password}' | base64 -d

# Logs
kubectl logs -n kafka -l strimzi.io/cluster=my-cluster -c kafka --tail=100

# Exec into broker
kubectl exec -n kafka my-cluster-kafka-0 -c kafka -- \
  bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
```

---

## Troubleshooting

| Issue | Check | Fix |
|-------|-------|-----|
| Pods not starting | `kubectl describe pod` | Check resources, PVC |
| Cluster not ready | `kubectl get kafka -o yaml` | Check conditions |
| Topic creation fails | Topic operator logs | Check ACLs, config |
| Connection refused | Listener config | Verify listener type |

```

### references/producers.md

```markdown
# Producers

Building reliable Kafka producers with confluent-kafka-python.

---

## Basic Producer

```python
from confluent_kafka import Producer
import json

def delivery_callback(err, msg):
    if err:
        print(f"Delivery failed: {err}")
    else:
        print(f"Delivered to {msg.topic()}[{msg.partition()}]@{msg.offset()}")

producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'client.id': 'my-producer'
})

# Send message
producer.produce(
    topic='events',
    key='user-123',
    value=json.dumps({'action': 'login'}),
    callback=delivery_callback
)

# CRITICAL: flush to ensure delivery
producer.flush()
```

---

## Async Producer (AIOProducer)

For FastAPI and asyncio applications:

```python
from confluent_kafka import Producer
import asyncio

class AIOProducer:
    """Async wrapper for confluent-kafka Producer."""

    def __init__(self, config: dict):
        self._producer = Producer(config)
        self._loop = asyncio.get_event_loop()

    async def produce(self, topic: str, key: str, value: bytes) -> dict:
        """Async produce with delivery confirmation."""
        future = self._loop.create_future()

        def callback(err, msg):
            if err:
                self._loop.call_soon_threadsafe(
                    future.set_exception,
                    Exception(str(err))
                )
            else:
                self._loop.call_soon_threadsafe(
                    future.set_result,
                    {'partition': msg.partition(), 'offset': msg.offset()}
                )

        self._producer.produce(
            topic=topic,
            key=key.encode() if isinstance(key, str) else key,
            value=value,
            callback=callback
        )
        self._producer.poll(0)  # Trigger callback

        return await future

    async def flush(self, timeout: float = 10.0):
        """Flush with async-friendly polling."""
        while True:
            remaining = self._producer.flush(timeout=0.1)
            if remaining == 0:
                break
            await asyncio.sleep(0.01)

    def close(self):
        self._producer.flush()
```

---

## Production Configuration

```python
producer = Producer({
    # Connection
    'bootstrap.servers': 'kafka:9092',
    'client.id': 'order-service-producer',

    # Durability (CRITICAL)
    'acks': 'all',                    # Wait for all replicas
    'enable.idempotence': True,       # Prevent duplicates

    # Retries
    'retries': 2147483647,            # Infinite retries
    'retry.backoff.ms': 100,          # Backoff between retries
    'delivery.timeout.ms': 120000,    # 2 min total timeout

    # Batching (performance)
    'batch.size': 16384,              # 16KB batch
    'linger.ms': 5,                   # Wait 5ms for batch
    'compression.type': 'lz4',        # Compress batches

    # Memory
    'buffer.memory': 33554432,        # 32MB buffer
    'max.block.ms': 60000,            # Block if buffer full
})
```

### Configuration Explained

| Setting | Value | Why |
|---------|-------|-----|
| `acks=all` | Wait for ISR | Durability over speed |
| `enable.idempotence` | Exactly-once per partition | Prevents duplicates |
| `retries` | Very high | Network issues are transient |
| `linger.ms` | 5-100ms | Balance latency vs throughput |
| `compression.type` | lz4 | Fast, good ratio |

---

## Delivery Guarantees

### At-Most-Once (Fire and Forget)

```python
producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'acks': '0',  # Don't wait for ack
})

producer.produce(topic='metrics', value=data)
# No flush, no callback - may lose messages
```

### At-Least-Once (Default Recommended)

```python
producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'retries': 2147483647,
})

producer.produce(topic='orders', value=data, callback=delivery_callback)
producer.flush()  # Wait for confirmation
```

### Exactly-Once (Transactions)

```python
producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'transactional.id': 'order-processor-1',  # Unique per instance
    'acks': 'all',
    'enable.idempotence': True,
})

producer.init_transactions()

try:
    producer.begin_transaction()

    producer.produce(topic='orders', value=order_data)
    producer.produce(topic='inventory', value=inventory_update)

    producer.commit_transaction()
except Exception as e:
    producer.abort_transaction()
    raise
```

---

## Error Handling

### Simple Pattern (Using retriable())

```python
from confluent_kafka import Producer

def delivery_callback(err, msg):
    if err is None:
        print(f"Delivered: {msg.topic()}[{msg.partition()}]@{msg.offset()}")
        return

    if err.retriable():
        # Transient error (network, broker restart) - will auto-retry
        print(f"Retriable error for {msg.key()}: {err}")
    else:
        # Fatal error (authorization, invalid topic) - needs intervention
        print(f"Fatal error for {msg.key()}: {err}")
        send_to_dlq(msg, err)
```

### Production Pattern (ReliableProducer with DLQ)

```python
from confluent_kafka import Producer
import json
import logging

logger = logging.getLogger(__name__)

class ReliableProducer:
    """Production producer with automatic DLQ handling."""

    def __init__(self, bootstrap_servers: str, client_id: str):
        config = {
            'bootstrap.servers': bootstrap_servers,
            'client.id': client_id,
            'acks': 'all',
            'enable.idempotence': True,
            'retries': 2147483647,
            'delivery.timeout.ms': 120000,
        }
        self.producer = Producer(config)
        self.dlq_producer = Producer(config)

    def _delivery_callback(self, err, msg):
        if err is None:
            logger.info(f"Delivered: {msg.topic()}[{msg.partition()}]@{msg.offset()}")
            return

        if err.retriable():
            logger.warning(f"Retriable error (auto-retry): {err}")
        else:
            logger.error(f"Fatal error, sending to DLQ: {err}")
            self._send_to_dlq(msg, err)

    def _send_to_dlq(self, msg, err):
        dlq_message = {
            'original_topic': msg.topic(),
            'original_key': msg.key().decode() if msg.key() else None,
            'original_value': msg.value().decode() if msg.value() else None,
            'error': str(err),
            'error_code': err.code(),
        }
        self.dlq_producer.produce(
            topic=f"{msg.topic()}.dlq",
            key=msg.key(),
            value=json.dumps(dlq_message),
        )

    def send(self, topic: str, key: str, value: dict):
        self.producer.produce(
            topic=topic,
            key=key.encode() if key else None,
            value=json.dumps(value).encode(),
            callback=self._delivery_callback,
        )
        self.producer.poll(0)

    def flush(self):
        self.producer.flush()
        self.dlq_producer.flush()

    def close(self):
        self.flush()

# Usage
producer = ReliableProducer(
    bootstrap_servers='localhost:30092',
    client_id='task-service'
)
producer.send('task-events', 'task-123', {'title': 'Buy groceries'})
producer.flush()
```

### Error Categories

| Error Type | `err.retriable()` | Action | Examples |
|------------|-------------------|--------|----------|
| Retriable | `True` | Auto-retry | Network timeout, broker restart |
| Fatal | `False` | Send to DLQ | Auth failure, topic doesn't exist |

### Buffer Full Handling

```python
def safe_produce(producer, topic, key, value, max_retries=3):
    for attempt in range(max_retries):
        try:
            producer.produce(topic=topic, key=key, value=value, callback=delivery_callback)
            producer.poll(0)
            return True
        except BufferError:
            logger.warning("Buffer full, backing off")
            producer.poll(1)  # Wait for some deliveries
            time.sleep(0.5 * (2 ** attempt))
    return False
```

---

## Message Key Design

Choose the right key based on ordering requirements and scalability:

| Use Case | Recommended Key | Ordering | Scalability |
|----------|-----------------|----------|-------------|
| Entity lifecycle | `entity_id` | ✅ Per entity | ✅ Scales with entities |
| User activity | `user_id` | ✅ Per user | ⚠️ Hot users = hot partitions |
| Multi-tenant | `tenant_id:entity_id` | ✅ Per entity | ✅ Good isolation |
| Max parallelism | `None` (round-robin) | ❌ None | ✅ Maximum |

### Example: Task Lifecycle Events

```python
# Task events: created → updated → completed
# Key by task_id ensures ordering within task

producer.produce(
    topic='task-events',
    key=f'task-{task_id}'.encode(),  # All events for this task → same partition
    value=json.dumps({
        'type': 'task.created',
        'task_id': task_id,
        'user_id': user_id,
    })
)

# Later...
producer.produce(
    topic='task-events',
    key=f'task-{task_id}'.encode(),  # Same key → same partition → ordered
    value=json.dumps({
        'type': 'task.completed',
        'task_id': task_id,
    })
)
```

### Trade-offs

| Key Choice | Pros | Cons |
|------------|------|------|
| `task_id` | Lifecycle ordering guaranteed | One busy task can't parallelize |
| `user_id` | User activity ordered | Power users create hot partitions |
| `project_id` | Project-level aggregation | Large projects = hot partitions |
| Compound `project:task` | Balanced | More complex key management |

---

## Partitioning Strategies

### Key-Based (Default)

```python
# Same key always goes to same partition
producer.produce(
    topic='user-events',
    key=f'user-{user_id}',  # Consistent hashing
    value=event_data
)
```

### Custom Partitioner

```python
from confluent_kafka import Producer

def region_partitioner(key, all_partitions, available_partitions):
    """Route by region prefix."""
    if key is None:
        return random.choice(available_partitions)

    region = key.decode().split('-')[0]  # "us-east-user-123"
    regions = {'us-east': 0, 'us-west': 1, 'eu': 2}
    return regions.get(region, 0)

# Note: confluent-kafka-python doesn't support custom partitioners directly
# Use key design instead:
key = f"{region}-{entity_id}"  # Partition by hash of full key
```

### Round-Robin

```python
# No key = round-robin
producer.produce(
    topic='logs',
    key=None,  # Distribute evenly
    value=log_entry
)
```

---

## Headers and Metadata

```python
from uuid import uuid4
import time

producer.produce(
    topic='events',
    key='order-123',
    value=order_data,
    headers=[
        ('correlation_id', str(uuid4()).encode()),
        ('trace_id', trace_id.encode()),
        ('source', 'order-service'.encode()),
        ('event_type', 'order.created'.encode()),
    ],
    timestamp=int(time.time() * 1000)  # Event time
)
```

---

## Batching for Throughput

```python
# High-throughput producer
producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'acks': 'all',

    # Aggressive batching
    'batch.size': 65536,        # 64KB batches
    'linger.ms': 50,            # Wait 50ms for batch
    'compression.type': 'lz4',

    # More in-flight
    'max.in.flight.requests.per.connection': 5,
})

# Produce many messages
for event in events:
    producer.produce(topic='events', value=event)
    producer.poll(0)  # Non-blocking poll

producer.flush()  # Final flush
```

---

## Latency vs Throughput Tuning Matrix

Use this table to configure producers based on use case:

| Scenario | linger.ms | batch.size | Compression | acks |
|----------|-----------|------------|-------------|------|
| **Real-time actions** (API responses) | 0-5 | 16KB | none/snappy | all |
| **Analytics events** (dashboards) | 10-50 | 64KB-256KB | lz4 | all |
| **Batch pipelines** (ETL) | 100-500 | 512KB-1MB | lz4/zstd | all |
| **Log aggregation** (high volume) | 500-1000 | 1MB | zstd | 1 |
| **Non-critical metrics** | 100 | 256KB | lz4 | 0 |

**Batching trigger:** Message sent when EITHER `linger.ms` expires OR `batch.size` reached.

---

## Stats Callback for Debugging

Monitor batching efficiency and diagnose throughput issues:

```python
from confluent_kafka import Producer
import json

def stats_callback(stats_json):
    """Called periodically with producer statistics."""
    stats = json.loads(stats_json)

    # Overall producer stats
    print(f"Messages in queue: {stats.get('msg_cnt', 0)}")
    print(f"Messages in flight: {stats.get('msg_size', 0)} bytes")

    # Per-topic stats
    for topic_name, topic_stats in stats.get('topics', {}).items():
        for partition_id, partition_stats in topic_stats.get('partitions', {}).items():
            batch_size = partition_stats.get('batchsize', {})
            print(f"{topic_name}[{partition_id}]:")
            print(f"  Avg batch size: {batch_size.get('avg', 0)} bytes")
            print(f"  Batch count: {batch_size.get('cnt', 0)}")

producer = Producer({
    'bootstrap.servers': 'localhost:30092',
    'linger.ms': 50,
    'batch.size': 65536,
    'stats_cb': stats_callback,
    'statistics.interval.ms': 5000,  # Stats every 5 seconds
})
```

**Use this to diagnose:**
- Batches too small → increase `linger.ms`
- Queue backing up → increase `batch.size` or producer count
- Messages not batching → check key distribution (same key = same partition)

---

## Metrics to Monitor

| Metric | Healthy | Action if Unhealthy |
|--------|---------|---------------------|
| `record-send-rate` | Stable | Check batching config |
| `record-error-rate` | ~0 | Check broker health |
| `request-latency-avg` | <100ms | Check `acks`, network |
| `buffer-available-bytes` | >50% | Increase buffer or slow down |
| `batch-size-avg` | Near `batch.size` | Increase `linger.ms` |

```

### references/consumers.md

```markdown
# Consumers

Building reliable Kafka consumers with confluent-kafka-python.

---

## Bootstrap Server Selection (CRITICAL)

**Where is your code running?**

| Location | Bootstrap Server | When |
|----------|------------------|------|
| Local Mac/Windows | `localhost:30092` | NodePort for development |
| Same K8s namespace | `dev-cluster-kafka-bootstrap:9092` | Short name |
| Different K8s namespace | `dev-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092` | FQDN |

### Environment-Aware Pattern (Recommended)

```python
import os

consumer = Consumer({
    'bootstrap.servers': os.environ.get(
        'KAFKA_BOOTSTRAP_SERVERS',
        'localhost:30092'  # Default for local development
    ),
    'group.id': 'my-service',
    # ... other config
})
```

**Usage:**

```bash
# Local development (uses default localhost:30092)
python consumer.py

# Kubernetes deployment (set in Pod spec)
KAFKA_BOOTSTRAP_SERVERS=dev-cluster-kafka-bootstrap:9092 python consumer.py
```

### Kubernetes ConfigMap Pattern

```yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-config
data:
  KAFKA_BOOTSTRAP_SERVERS: "dev-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
---
apiVersion: apps/v1
kind: Deployment
spec:
  template:
    spec:
      containers:
      - name: consumer
        envFrom:
        - configMapRef:
            name: kafka-config
```

### Common Mistakes

| Mistake | Symptom | Fix |
|---------|---------|-----|
| K8s DNS from local Mac | `Failed to resolve` | Use `localhost:30092` |
| `localhost:9092` in K8s | Connection refused | Use K8s service name |
| Port 9092 from local | Connection timeout | Use NodePort (30092) |

---

## Basic Consumer

```python
from confluent_kafka import Consumer, KafkaError

consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # Manual commit
})

consumer.subscribe(['order-events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue  # End of partition
            raise KafkaException(msg.error())

        # Process message
        process(msg.value())

        # Commit after successful processing
        consumer.commit(asynchronous=False)

finally:
    consumer.close()  # Leave group cleanly
```

---

## Async Consumer (AIOConsumer)

For FastAPI and asyncio applications:

```python
from confluent_kafka import Consumer
import asyncio
from typing import Callable, Awaitable

class AIOConsumer:
    """Async wrapper for confluent-kafka Consumer."""

    def __init__(self, config: dict, topics: list[str]):
        self._consumer = Consumer(config)
        self._consumer.subscribe(topics)
        self._running = False

    async def consume(
        self,
        handler: Callable[[bytes], Awaitable[None]],
        poll_timeout: float = 1.0
    ):
        """Consume messages with async handler."""
        self._running = True
        loop = asyncio.get_event_loop()

        while self._running:
            # Poll in thread pool (blocking call)
            msg = await loop.run_in_executor(
                None,
                self._consumer.poll,
                poll_timeout
            )

            if msg is None:
                continue
            if msg.error():
                await self._handle_error(msg.error())
                continue

            try:
                await handler(msg.value())
                self._consumer.commit(asynchronous=False)
            except Exception as e:
                await self._handle_processing_error(msg, e)

    async def _handle_error(self, error):
        if error.code() != KafkaError._PARTITION_EOF:
            logger.error(f"Consumer error: {error}")

    async def _handle_processing_error(self, msg, error):
        logger.error(f"Processing failed: {error}")
        # Send to dead letter queue
        await self.send_to_dlq(msg)
        self._consumer.commit()  # Move past failed message

    def stop(self):
        self._running = False

    def close(self):
        self._consumer.close()
```

---

## Production Configuration

```python
consumer = Consumer({
    # Connection
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'order-processor',
    'client.id': 'order-processor-1',

    # Offset management
    'auto.offset.reset': 'earliest',     # Start from beginning
    'enable.auto.commit': False,         # Manual commit

    # Session management
    'session.timeout.ms': 45000,         # 45s heartbeat timeout
    'heartbeat.interval.ms': 15000,      # Heartbeat every 15s
    'max.poll.interval.ms': 300000,      # 5min max processing time

    # Fetching
    'fetch.min.bytes': 1,                # Don't wait for data
    'fetch.max.wait.ms': 500,            # Max wait time
    'max.partition.fetch.bytes': 1048576, # 1MB per partition

    # Partition assignment
    'partition.assignment.strategy': 'cooperative-sticky',
})
```

### Configuration Explained

| Setting | Value | Why |
|---------|-------|-----|
| `enable.auto.commit=False` | Manual | Control when message is "done" |
| `max.poll.interval.ms` | 5 min | Time for processing before rebalance |
| `session.timeout.ms` | 45s | Detect dead consumers |
| `cooperative-sticky` | Assignment | Minimize rebalances |

---

## Consumer Groups

```
Topic: orders (4 partitions)

Group: order-processor
┌──────────────┬──────────────┬──────────────┐
│ Consumer 1   │ Consumer 2   │ Consumer 3   │
│ Partition 0  │ Partition 2  │ Partition 3  │
│ Partition 1  │              │              │
└──────────────┴──────────────┴──────────────┘

Add Consumer 4:
┌──────────────┬──────────────┬──────────────┬──────────────┐
│ Consumer 1   │ Consumer 2   │ Consumer 3   │ Consumer 4   │
│ Partition 0  │ Partition 1  │ Partition 2  │ Partition 3  │
└──────────────┴──────────────┴──────────────┴──────────────┘

Add Consumer 5 and 6 (MORE CONSUMERS THAN PARTITIONS):
┌──────────────┬──────────────┬──────────────┬──────────────┬──────────────┬──────────────┐
│ Consumer 1   │ Consumer 2   │ Consumer 3   │ Consumer 4   │ Consumer 5   │ Consumer 6   │
│ Partition 0  │ Partition 1  │ Partition 2  │ Partition 3  │ IDLE         │ IDLE         │
└──────────────┴──────────────┴──────────────┴──────────────┴──────────────┴──────────────┘
```

### Scaling Rule

**`consumers <= partitions` for efficiency**

| Partitions | Consumers | Result |
|------------|-----------|--------|
| 4 | 2 | Each consumer handles 2 partitions |
| 4 | 4 | Each consumer handles 1 partition (optimal) |
| 4 | 6 | 4 consumers active, **2 IDLE** (wasted resources) |
| 4 | 10 | 4 consumers active, **6 IDLE** (very wasteful) |

**When to increase partitions vs consumers:**
- High lag + consumers at partition limit → increase partitions
- Low CPU per consumer → add more consumers (up to partition count)
- Need more parallelism → increase partitions first

### Group ID Best Practices

```
<service-name>-<function>

Examples:
order-service-processor     # Main processing
order-service-analytics     # Different group, same topic
notification-sender
agent-task-worker
```

---

## Offset Commit Patterns

### After Each Message (Safest)

```python
while True:
    msg = consumer.poll(1.0)
    if msg and not msg.error():
        process(msg.value())
        consumer.commit(asynchronous=False)  # Sync commit
```

### Batch Commit (Balanced)

```python
batch_size = 100
processed = 0

while True:
    msg = consumer.poll(1.0)
    if msg and not msg.error():
        process(msg.value())
        processed += 1

        if processed >= batch_size:
            consumer.commit(asynchronous=False)
            processed = 0
```

### Async Commit (Fast, Less Safe)

```python
while True:
    msg = consumer.poll(1.0)
    if msg and not msg.error():
        process(msg.value())
        consumer.commit(asynchronous=True)  # Fire and forget
```

### Manual Offset Tracking

```python
from confluent_kafka import TopicPartition

while True:
    msg = consumer.poll(1.0)
    if msg and not msg.error():
        process(msg.value())

        # Commit specific offset
        tp = TopicPartition(msg.topic(), msg.partition(), msg.offset() + 1)
        consumer.commit(offsets=[tp], asynchronous=False)
```

---

## Rebalancing

When consumers join/leave, partitions are reassigned.

### Rebalance Callback

```python
def on_assign(consumer, partitions):
    """Called when partitions assigned."""
    logger.info(f"Assigned: {partitions}")
    # Optional: seek to specific offsets

def on_revoke(consumer, partitions):
    """Called before partitions revoked."""
    logger.info(f"Revoking: {partitions}")
    # CRITICAL: commit offsets before losing partitions
    consumer.commit(asynchronous=False)

def on_lost(consumer, partitions):
    """Called when partitions lost (crash)."""
    logger.warning(f"Lost: {partitions}")
    # Cannot commit - partitions already reassigned

consumer.subscribe(
    ['orders'],
    on_assign=on_assign,
    on_revoke=on_revoke,
    on_lost=on_lost
)
```

### Cooperative Sticky Assignment

Minimizes partition movement during rebalance:

```python
consumer = Consumer({
    'partition.assignment.strategy': 'cooperative-sticky',
    # Other config...
})
```

---

## Error Handling

```python
from confluent_kafka import KafkaError, KafkaException

def consume_with_retry():
    while True:
        try:
            msg = consumer.poll(1.0)

            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                elif msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
                    logger.error("Topic doesn't exist")
                    time.sleep(5)  # Wait for topic creation
                    continue
                else:
                    raise KafkaException(msg.error())

            # Process with retry
            for attempt in range(3):
                try:
                    process(msg.value())
                    break
                except TransientError:
                    time.sleep(0.5 * (2 ** attempt))
            else:
                # All retries failed - send to DLQ
                send_to_dlq(msg)

            consumer.commit()

        except KafkaException as e:
            logger.error(f"Kafka error: {e}")
            time.sleep(1)
```

---

## Dead Letter Queue (DLQ)

```python
dlq_producer = Producer({'bootstrap.servers': 'kafka:9092'})

def send_to_dlq(original_msg, error: Exception):
    """Send failed message to dead letter queue."""
    dlq_producer.produce(
        topic=f"{original_msg.topic()}.dlq",
        key=original_msg.key(),
        value=original_msg.value(),
        headers=[
            ('original_topic', original_msg.topic().encode()),
            ('original_partition', str(original_msg.partition()).encode()),
            ('original_offset', str(original_msg.offset()).encode()),
            ('error_type', type(error).__name__.encode()),
            ('error_message', str(error).encode()),
            ('failed_at', datetime.utcnow().isoformat().encode()),
        ]
    )
    dlq_producer.flush()
```

---

## Consumer Lag

Difference between latest offset and consumer position.

### Check Lag Programmatically

```python
from confluent_kafka import TopicPartition

def get_consumer_lag(consumer, topic):
    """Get lag for all partitions."""
    # Get assigned partitions
    assignment = consumer.assignment()

    # Get committed offsets
    committed = consumer.committed(assignment)

    # Get high watermarks (latest offsets)
    lag_info = []
    for tp in assignment:
        _, high = consumer.get_watermark_offsets(tp)
        committed_offset = next(
            (c.offset for c in committed if c.partition == tp.partition),
            0
        )
        lag = high - committed_offset
        lag_info.append({
            'partition': tp.partition,
            'committed': committed_offset,
            'latest': high,
            'lag': lag
        })

    return lag_info
```

### CLI Lag Check

```bash
kafka-consumer-groups.sh \
  --bootstrap-server kafka:9092 \
  --describe \
  --group order-processor
```

---

## Parallel Processing

### Multi-Threaded

```python
from concurrent.futures import ThreadPoolExecutor
import threading

class ParallelConsumer:
    def __init__(self, config, topics, workers=4):
        self._consumer = Consumer(config)
        self._consumer.subscribe(topics)
        self._executor = ThreadPoolExecutor(max_workers=workers)
        self._pending = {}
        self._lock = threading.Lock()

    def consume(self):
        while True:
            msg = self._consumer.poll(0.1)

            if msg and not msg.error():
                # Submit to thread pool
                future = self._executor.submit(self._process, msg)
                with self._lock:
                    self._pending[msg.offset()] = future

            # Commit completed
            self._commit_completed()

    def _process(self, msg):
        process(msg.value())
        return msg.offset()

    def _commit_completed(self):
        with self._lock:
            completed = [
                offset for offset, future in self._pending.items()
                if future.done()
            ]
            if completed:
                # Commit up to highest completed
                # (simplified - real impl needs partition tracking)
                self._consumer.commit()
                for offset in completed:
                    del self._pending[offset]
```

---

## Metrics to Monitor

| Metric | Healthy | Action if Unhealthy |
|--------|---------|---------------------|
| Consumer lag | Low/stable | Add consumers, check processing time |
| Rebalance rate | ~0 | Check `max.poll.interval.ms` |
| Commit rate | Matches consume rate | Check commit logic |
| Poll latency | <100ms | Check broker connectivity |
| Processing time | < `max.poll.interval.ms` | Optimize or increase timeout |

```

### references/delivery-semantics.md

```markdown
# Delivery Semantics

Understanding and implementing message delivery guarantees.

---

## The Three Guarantees

| Guarantee | Definition | Messages Lost? | Messages Duplicated? |
|-----------|------------|----------------|---------------------|
| **At-most-once** | Fire and forget | Possible | No |
| **At-least-once** | Retry until ack | No | Possible |
| **Exactly-once** | Transactional | No | No |

---

## Decision Tree (Agent Guidance)

Use this flow to select the appropriate guarantee:

```
Q1: Can you lose messages?
├── YES → At-most-once
│         (metrics, logs, click tracking)
└── NO  → Continue to Q2

Q2: Can your consumer handle duplicates (is it idempotent)?
├── YES → At-least-once (RECOMMENDED)
│         (most applications - dedupe by ID)
└── NO  → Continue to Q3

Q3: Can you MAKE your consumer idempotent?
├── YES → At-least-once + idempotent consumer (BEST)
│         (add deduplication layer)
└── NO  → Exactly-once (LAST RESORT)
          (Kafka→Kafka only, higher latency)
```

**Critical:** Exactly-once only works when reading from Kafka AND writing to Kafka (or transactional external stores). Most consumers write to databases, call APIs, or send emails—operations outside Kafka transactions. **Idempotent consumers are almost always simpler.**

---

## At-Most-Once

**Use when:** Loss acceptable, duplicates not (rare in practice)

```python
# Producer: don't wait for acks
producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'acks': '0',  # No acknowledgment
})

producer.produce(topic='metrics', value=data)
# No flush - may lose messages

# Consumer: commit before processing
consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'metrics-reader',
    'enable.auto.commit': True,
    'auto.commit.interval.ms': 1000,
})

while True:
    msg = consumer.poll(1.0)
    if msg:
        # Offset committed before processing
        # If processing fails, message is lost
        process(msg.value())
```

**Use cases:**
- Metrics collection (occasional loss OK)
- Logging (gaps acceptable)
- Real-time analytics (latest matters, not all)

---

## At-Least-Once

**Use when:** No loss acceptable, duplicates can be handled (most common)

```python
# Producer: wait for all replicas
producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'retries': 2147483647,
    'delivery.timeout.ms': 120000,
})

def delivery_callback(err, msg):
    if err:
        # Handle failure - retry or alert
        logger.error(f"Delivery failed: {err}")
        retry_or_alert(msg)

producer.produce(
    topic='orders',
    value=order_data,
    callback=delivery_callback
)
producer.flush()  # Wait for confirmation

# Consumer: commit after processing
consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'order-processor',
    'enable.auto.commit': False,
})

while True:
    msg = consumer.poll(1.0)
    if msg and not msg.error():
        try:
            process(msg.value())
            consumer.commit()  # Only after success
        except Exception as e:
            # Don't commit - will reprocess
            logger.error(f"Processing failed: {e}")
```

### Handling Duplicates (Idempotency)

```python
# Option 1: Database unique constraint
def process_order(order):
    try:
        db.execute(
            "INSERT INTO orders (order_id, ...) VALUES (%s, ...)",
            (order['id'], ...)
        )
    except UniqueViolation:
        logger.info(f"Duplicate order {order['id']}, skipping")

# Option 2: Idempotency key tracking
processed_ids = redis.Redis()

def process_order(order):
    key = f"processed:{order['id']}"
    if processed_ids.exists(key):
        return  # Already processed

    do_processing(order)
    processed_ids.setex(key, 86400, "1")  # 24h TTL

# Option 3: Version/timestamp check
def process_order(order):
    current = db.get_order(order['id'])
    if current and current.version >= order['version']:
        return  # Stale or duplicate

    db.upsert_order(order)
```

---

## Exactly-Once (Transactions)

**Use when:** Financial, critical data, no duplicates allowed

### Producer Transactions

```python
producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'transactional.id': 'order-processor-1',  # Must be unique per instance
    'acks': 'all',
    'enable.idempotence': True,
})

# Initialize once at startup
producer.init_transactions()

def process_and_produce(input_msg):
    try:
        producer.begin_transaction()

        # Process input
        result = transform(input_msg.value())

        # Produce outputs (all or nothing)
        producer.produce(topic='processed-orders', value=result)
        producer.produce(topic='audit-log', value=audit_entry)

        # Commit consumer offset as part of transaction
        producer.send_offsets_to_transaction(
            consumer.position(consumer.assignment()),
            consumer.consumer_group_metadata()
        )

        producer.commit_transaction()

    except Exception as e:
        producer.abort_transaction()
        raise
```

### Consumer with Transactions

```python
consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'exactly-once-processor',
    'isolation.level': 'read_committed',  # CRITICAL
    'enable.auto.commit': False,
})
```

### Transactional ID Strategy

```
<service>-<instance>

Examples:
order-processor-0
order-processor-1
payment-handler-pod-abc123
```

**Important:** Same transactional.id across restarts = zombie fencing works.

---

## Consume-Process-Produce Pattern

The most common exactly-once pattern:

```python
class ExactlyOnceProcessor:
    def __init__(self, consumer_config, producer_config, input_topic, output_topic):
        self.consumer = Consumer({
            **consumer_config,
            'isolation.level': 'read_committed',
            'enable.auto.commit': False,
        })
        self.producer = Producer({
            **producer_config,
            'transactional.id': f'{input_topic}-processor-{uuid.uuid4()}',
            'acks': 'all',
            'enable.idempotence': True,
        })
        self.input_topic = input_topic
        self.output_topic = output_topic

    def start(self):
        self.consumer.subscribe([self.input_topic])
        self.producer.init_transactions()

    def run(self):
        while True:
            msg = self.consumer.poll(1.0)
            if msg is None or msg.error():
                continue

            try:
                self.producer.begin_transaction()

                # Process
                result = self.process(msg.value())

                # Produce result
                self.producer.produce(self.output_topic, value=result)

                # Commit consumer offset in transaction
                self.producer.send_offsets_to_transaction(
                    self.consumer.position(self.consumer.assignment()),
                    self.consumer.consumer_group_metadata()
                )

                self.producer.commit_transaction()

            except Exception as e:
                self.producer.abort_transaction()
                logger.error(f"Transaction failed: {e}")

    def process(self, value):
        # Your processing logic
        return transform(value)
```

---

## Decision Matrix

| Scenario | Guarantee | Why |
|----------|-----------|-----|
| Metrics/logs | At-most-once | Loss acceptable |
| Event notifications | At-least-once | Idempotent handlers |
| Order processing | At-least-once | Dedupe by order ID |
| Financial transactions | Exactly-once | No tolerance for errors |
| Agent task dispatch | At-least-once | Idempotent task execution |
| Saga coordination | Exactly-once | Compensation complexity |

---

## Performance Comparison

| Guarantee | Latency | Throughput | Complexity |
|-----------|---------|------------|------------|
| At-most-once | Lowest | Highest | Lowest |
| At-least-once | Medium | Medium | Low |
| Exactly-once | Highest | Lowest | High |

### Exactly-Once Overhead

- ~2x latency (transaction commit)
- ~20-30% throughput reduction
- Additional broker coordination

---

## Gotchas

### Producer

| Gotcha | Symptom | Fix |
|--------|---------|-----|
| `acks=1` | Data loss on leader failure | Use `acks=all` |
| No idempotence | Duplicates on retry | `enable.idempotence=true` |
| Low `delivery.timeout.ms` | Premature failure | Increase to 2+ minutes |

### Consumer

| Gotcha | Symptom | Fix |
|--------|---------|-----|
| Auto-commit | Message loss on crash | Manual commit after processing |
| Commit before process | Message loss | Commit after success |
| No `read_committed` | See uncommitted (aborted) messages | Set isolation level |

### Transactions

| Gotcha | Symptom | Fix |
|--------|---------|-----|
| Same transactional.id | Producer fenced | Unique ID per instance |
| Long transactions | Timeout, abort | Keep transactions short |
| Missing `send_offsets_to_transaction` | Consumer reprocesses | Include offset commit |

```

### references/outbox-pattern.md

```markdown
# Transactional Outbox Pattern

Guaranteed event publishing without dual-write inconsistency.

---

## The Problem: Dual-Write Inconsistency

```python
# DANGEROUS: Two separate operations
def create_task(task_data):
    db.insert(task_data)           # 1. Database write
    kafka.produce(task_event)      # 2. Kafka write
    # What if app crashes between these two?
    # Database has task, but no event published
```

**Failure scenarios:**
- Crash after DB write, before Kafka → data exists, no event
- Kafka fails, DB succeeds → data exists, no event
- Both succeed but in wrong order → consumers see stale state

---

## The Solution: Transactional Outbox

Write events to an outbox table in the SAME database transaction as business data. A separate process (Debezium CDC) reads the outbox and publishes to Kafka.

```
┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────┐
│   App       │────▶│  Database   │────▶│  Debezium   │────▶│  Kafka  │
│             │     │ (atomic)    │     │  (CDC)      │     │         │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────┘
                    tasks + outbox       reads WAL          publishes
                    in one transaction   publishes events   to topic
```

**Guarantee:** If the business data is committed, the event WILL be published.

---

## Outbox Table Schema

```sql
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(255) NOT NULL,  -- e.g., "Task", "Order"
    aggregate_id VARCHAR(255) NOT NULL,    -- e.g., task_id, order_id
    event_type VARCHAR(255) NOT NULL,      -- e.g., "TaskCreated"
    payload JSONB NOT NULL,                -- Event data
    created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_outbox_created_at ON outbox(created_at);
```

---

## Python: Atomic Write Pattern

```python
from sqlalchemy import text
from sqlalchemy.orm import Session
import json
import uuid
from datetime import datetime, timezone

def create_task_with_event(session: Session, title: str, owner_id: str) -> dict:
    """Create task and event atomically - both succeed or both fail."""
    task_id = str(uuid.uuid4())

    event_payload = {
        "task_id": task_id,
        "title": title,
        "owner_id": owner_id,
        "created_at": datetime.now(timezone.utc).isoformat()
    }

    # Single transaction - atomic
    session.execute(
        text("""
            INSERT INTO tasks (id, title, owner_id, status)
            VALUES (:id, :title, :owner_id, 'pending')
        """),
        {"id": task_id, "title": title, "owner_id": owner_id}
    )

    session.execute(
        text("""
            INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
            VALUES (:agg_type, :agg_id, :event_type, :payload)
        """),
        {
            "agg_type": "Task",
            "agg_id": task_id,
            "event_type": "TaskCreated",
            "payload": json.dumps(event_payload)
        }
    )

    session.commit()  # Both writes atomic
    return {"id": task_id, "title": title}
```

---

## PostgreSQL Setup for Debezium

```sql
-- Enable logical replication (postgresql.conf)
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4

-- Create replication user
CREATE ROLE debezium WITH LOGIN REPLICATION PASSWORD 'dbz-secret';
GRANT CONNECT ON DATABASE taskdb TO debezium;
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;
```

---

## Kubernetes: KafkaConnect with Debezium

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: task-connect
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 4.1.1
  replicas: 1
  bootstrapServers: task-events-kafka-bootstrap:9092
  config:
    group.id: task-connect-cluster
    offset.storage.topic: connect-offsets
    config.storage.topic: connect-configs
    status.storage.topic: connect-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
  build:
    output:
      type: docker
      image: my-registry/kafka-connect-debezium:latest
    plugins:
      - name: debezium-postgres
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.0.0.Final/debezium-connector-postgres-3.0.0.Final-plugin.tar.gz
```

---

## Outbox Connector with Event Router

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: task-outbox-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: task-connect
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  tasksMax: 1
  config:
    # Database connection
    database.hostname: postgres-service
    database.port: "5432"
    database.user: debezium
    database.password: "${file:/opt/kafka/external-configuration/postgres-creds/password}"
    database.dbname: taskdb

    # Replication settings
    topic.prefix: taskdb
    plugin.name: pgoutput
    slot.name: debezium_outbox_slot
    publication.name: debezium_outbox_pub

    # Only capture outbox table
    table.include.list: public.outbox

    # Event Router transformation
    transforms: outbox
    transforms.outbox.type: io.debezium.transforms.outbox.EventRouter
    transforms.outbox.route.topic.replacement: ${routedByValue}.events
    transforms.outbox.table.field.event.type: event_type
    transforms.outbox.table.field.event.key: aggregate_id
    transforms.outbox.table.field.event.payload: payload
    transforms.outbox.table.expand.json.payload: true

    # Schema history
    schema.history.internal.kafka.bootstrap.servers: task-events-kafka-bootstrap:9092
    schema.history.internal.kafka.topic: schema-changes.outbox
```

**Event Router behavior:**
- Routes to topic based on `aggregate_type` → `Task.events`
- Uses `aggregate_id` as message key
- Expands JSON payload into event body

---

## Outbox Cleanup

Outbox grows indefinitely without cleanup:

```sql
-- Delete processed entries older than 7 days
DELETE FROM outbox WHERE created_at < NOW() - INTERVAL '7 days';
```

**Kubernetes CronJob:**
```yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: outbox-cleanup
spec:
  schedule: "0 3 * * *"  # Daily at 3 AM
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: cleanup
            image: postgres:15
            command:
            - psql
            - -c
            - "DELETE FROM outbox WHERE created_at < NOW() - INTERVAL '7 days'"
            env:
            - name: PGHOST
              value: postgres-service
            - name: PGDATABASE
              value: taskdb
          restartPolicy: OnFailure
```

---

## When to Use Outbox Pattern

| Scenario | Use Outbox? | Why |
|----------|-------------|-----|
| DB + Kafka writes must be consistent | ✅ Yes | Eliminates dual-write |
| Event sourcing | ❌ No | Events ARE the source |
| Read-only consumers | ❌ No | No write consistency needed |
| Fire-and-forget events | ❌ No | Loss acceptable |
| Multi-database transactions | ✅ Yes | Saga coordination |

---

## Trade-offs

| Aspect | Benefit | Cost |
|--------|---------|------|
| Consistency | Guaranteed delivery | Added complexity |
| Latency | Slightly higher (CDC lag) | Usually < 1 second |
| Operations | Debezium to manage | More infrastructure |
| Debugging | Clear audit trail | More moving parts |

**Alternative:** If you only write to Kafka (event sourcing), you don't need outbox—Kafka IS your source of truth.

```

### references/monitoring.md

```markdown
# Monitoring

Prometheus metrics, Grafana dashboards, and alerting for Kafka.

---

## Strimzi Metrics Configuration

### Enable Prometheus Metrics

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yml
  kafkaExporter:
    topicRegex: ".*"
    groupRegex: ".*"
```

### Metrics ConfigMap

```yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-metrics
data:
  kafka-metrics-config.yml: |
    lowercaseOutputName: true
    rules:
      # Broker metrics
      - pattern: kafka.server<type=(.+), name=(.+)><>Value
        name: kafka_server_$1_$2
        type: GAUGE
      - pattern: kafka.server<type=(.+), name=(.+)><>Count
        name: kafka_server_$1_$2_total
        type: COUNTER

      # Topic metrics
      - pattern: kafka.log<type=(.+), name=(.+), topic=(.+), partition=(.+)><>Value
        name: kafka_log_$1_$2
        labels:
          topic: $3
          partition: $4
        type: GAUGE

      # Consumer group metrics
      - pattern: kafka.coordinator.group<type=(.+), name=(.+)><>Value
        name: kafka_coordinator_$1_$2
        type: GAUGE
```

---

## Key Metrics

### Broker Health

| Metric | Query | Alert Threshold |
|--------|-------|-----------------|
| Under-replicated partitions | `kafka_server_ReplicaManager_UnderReplicatedPartitions` | > 0 |
| Offline partitions | `kafka_controller_KafkaController_OfflinePartitionsCount` | > 0 |
| Active controller | `kafka_controller_KafkaController_ActiveControllerCount` | != 1 |
| ISR shrinks/expands | `rate(kafka_server_ReplicaManager_IsrShrinksPerSec[5m])` | > 0 sustained |

### Throughput

| Metric | Query | Description |
|--------|-------|-------------|
| Messages in/sec | `sum(rate(kafka_server_BrokerTopicMetrics_MessagesInPerSec[5m]))` | Cluster throughput |
| Bytes in/sec | `sum(rate(kafka_server_BrokerTopicMetrics_BytesInPerSec[5m]))` | Network in |
| Bytes out/sec | `sum(rate(kafka_server_BrokerTopicMetrics_BytesOutPerSec[5m]))` | Network out |

### Consumer Lag

| Metric | Query | Alert Threshold |
|--------|-------|-----------------|
| Consumer lag | `kafka_consumergroup_lag` | > 10000 |
| Lag rate | `rate(kafka_consumergroup_lag[5m])` | Increasing |
| Lag by group | `sum by(consumergroup)(kafka_consumergroup_lag)` | Group-specific |

### Latency

| Metric | Query | Alert Threshold |
|--------|-------|-----------------|
| Produce latency | `kafka_network_RequestMetrics_TotalTimeMs{request="Produce"}` | p99 > 100ms |
| Fetch latency | `kafka_network_RequestMetrics_TotalTimeMs{request="Fetch"}` | p99 > 100ms |
| Request queue time | `kafka_network_RequestChannel_RequestQueueTimeMs` | > 10ms |

---

## Prometheus ServiceMonitor

```yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: kafka-metrics
  namespace: kafka
  labels:
    release: prometheus  # Match Prometheus selector
spec:
  selector:
    matchLabels:
      strimzi.io/cluster: my-cluster
      strimzi.io/kind: Kafka
  endpoints:
    - port: tcp-prometheus
      interval: 30s
      path: /metrics
```

---

## Grafana Dashboards

### Strimzi Dashboard

Import official Strimzi dashboards:
- Kafka: ID `11285`
- ZooKeeper: ID `11287` (not needed for KRaft)
- Kafka Exporter: ID `11288`

### Custom Consumer Lag Dashboard

```json
{
  "title": "Consumer Lag",
  "panels": [
    {
      "title": "Lag by Consumer Group",
      "type": "timeseries",
      "targets": [
        {
          "expr": "sum by(consumergroup)(kafka_consumergroup_lag)",
          "legendFormat": "{{consumergroup}}"
        }
      ]
    },
    {
      "title": "Lag by Topic",
      "type": "timeseries",
      "targets": [
        {
          "expr": "sum by(topic)(kafka_consumergroup_lag)",
          "legendFormat": "{{topic}}"
        }
      ]
    },
    {
      "title": "Lag Heatmap",
      "type": "heatmap",
      "targets": [
        {
          "expr": "sum by(consumergroup, topic)(kafka_consumergroup_lag)"
        }
      ]
    }
  ]
}
```

---

## Alerting Rules

### PrometheusRule

```yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: kafka-alerts
  namespace: kafka
spec:
  groups:
    - name: kafka.rules
      rules:
        # Broker alerts
        - alert: KafkaUnderReplicatedPartitions
          expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
          for: 5m
          labels:
            severity: warning
          annotations:
            summary: Kafka under-replicated partitions
            description: "{{ $value }} partitions are under-replicated"

        - alert: KafkaOfflinePartitions
          expr: kafka_controller_KafkaController_OfflinePartitionsCount > 0
          for: 1m
          labels:
            severity: critical
          annotations:
            summary: Kafka offline partitions
            description: "{{ $value }} partitions are offline"

        - alert: KafkaNoActiveController
          expr: kafka_controller_KafkaController_ActiveControllerCount != 1
          for: 1m
          labels:
            severity: critical
          annotations:
            summary: No active Kafka controller

        # Consumer alerts
        - alert: KafkaConsumerLagHigh
          expr: sum by(consumergroup)(kafka_consumergroup_lag) > 10000
          for: 10m
          labels:
            severity: warning
          annotations:
            summary: "Consumer group {{ $labels.consumergroup }} lag is high"
            description: "Lag is {{ $value }} messages"

        - alert: KafkaConsumerLagIncreasing
          expr: |
            rate(kafka_consumergroup_lag[5m]) > 0
            and
            kafka_consumergroup_lag > 1000
          for: 15m
          labels:
            severity: warning
          annotations:
            summary: "Consumer lag increasing for {{ $labels.consumergroup }}"

        # Disk alerts
        - alert: KafkaDiskUsageHigh
          expr: |
            (kafka_log_Log_Size / kafka_log_Log_MaxSize) > 0.85
          for: 5m
          labels:
            severity: warning
          annotations:
            summary: Kafka disk usage above 85%
```

---

## Kafka Exporter

Dedicated exporter for consumer group metrics.

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafkaExporter:
    image: quay.io/strimzi/kafka:latest
    groupRegex: ".*"
    topicRegex: ".*"
    resources:
      requests:
        memory: 64Mi
        cpu: 50m
      limits:
        memory: 128Mi
        cpu: 100m
    logging: warn
    readinessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:
      initialDelaySeconds: 15
      timeoutSeconds: 5
```

---

## Application Metrics

### Producer Metrics

```python
from prometheus_client import Counter, Histogram, start_http_server

# Metrics
messages_produced = Counter(
    'kafka_messages_produced_total',
    'Total messages produced',
    ['topic']
)
produce_latency = Histogram(
    'kafka_produce_latency_seconds',
    'Produce latency',
    ['topic'],
    buckets=[.001, .005, .01, .025, .05, .1, .25, .5, 1]
)

# Usage
@produce_latency.labels(topic=topic).time()
async def produce_message(topic, value):
    result = await producer.produce(topic, value)
    messages_produced.labels(topic=topic).inc()
    return result

# Expose metrics
start_http_server(8000)
```

### Consumer Metrics

```python
from prometheus_client import Counter, Gauge

messages_consumed = Counter(
    'kafka_messages_consumed_total',
    'Total messages consumed',
    ['topic', 'group']
)
processing_time = Histogram(
    'kafka_message_processing_seconds',
    'Message processing time',
    ['topic']
)
consumer_lag = Gauge(
    'kafka_consumer_lag_messages',
    'Consumer lag in messages',
    ['topic', 'partition', 'group']
)
```

---

## Health Checks

### FastAPI Health Endpoint

```python
from fastapi import FastAPI, Response
from confluent_kafka.admin import AdminClient

app = FastAPI()

@app.get("/health")
async def health():
    try:
        admin = AdminClient({'bootstrap.servers': 'kafka:9092'})
        cluster_meta = admin.list_topics(timeout=5)
        return {
            "status": "healthy",
            "brokers": len(cluster_meta.brokers),
            "topics": len(cluster_meta.topics)
        }
    except Exception as e:
        return Response(
            content=f'{{"status": "unhealthy", "error": "{e}"}}',
            status_code=503,
            media_type="application/json"
        )
```

---

## Log Aggregation

### Fluent Bit Config

```yaml
[INPUT]
    Name              tail
    Path              /var/log/kafka/*.log
    Tag               kafka.*

[FILTER]
    Name              parser
    Match             kafka.*
    Key_Name          log
    Parser            kafka_log

[OUTPUT]
    Name              es
    Match             kafka.*
    Host              elasticsearch
    Port              9200
    Index             kafka-logs
```

```

### references/gotchas.md

```markdown
# Gotchas

Common Kafka mistakes and how to prevent them.

---

## Producer Gotchas

### 1. Fire and Forget (Data Loss)

**Problem:**
```python
producer.produce(topic, value)
# No callback, no flush
# App exits → messages lost
```

**Fix:**
```python
producer.produce(topic, value, callback=delivery_callback)
producer.flush()  # ALWAYS flush before exit
```

**Prevention:** Set `acks=all`, use callbacks, flush on shutdown.

---

### 2. Wrong acks Setting

**Problem:**
```python
# acks=0: No acknowledgment (fastest, loses messages)
# acks=1: Leader only (loses on leader failure)
producer = Producer({'acks': '1'})
```

**Fix:**
```python
producer = Producer({
    'acks': 'all',                # Wait for all ISR
    'enable.idempotence': True,   # Prevent duplicates
})
```

---

### 3. Blocking the Event Loop

**Problem:**
```python
# In FastAPI async endpoint
producer.flush(timeout=30)  # BLOCKS event loop for 30s
```

**Fix:**
```python
# Use thread pool for blocking calls
await asyncio.get_event_loop().run_in_executor(
    None, producer.flush, 10
)

# Or use proper async wrapper
await aio_producer.flush()
```

---

### 4. No Error Handling

**Problem:**
```python
producer.produce(topic, value)  # Silently fails on buffer full
```

**Fix:**
```python
try:
    producer.produce(topic, value, callback=on_delivery)
except BufferError:
    # Buffer full - back off and retry
    time.sleep(0.5)
    producer.poll(0)  # Process callbacks
    producer.produce(topic, value, callback=on_delivery)
```

---

## Consumer Gotchas

### 5. Auto-Commit with At-Least-Once

**Problem:**
```python
consumer = Consumer({
    'enable.auto.commit': True,  # DEFAULT IS TRUE!
})

msg = consumer.poll()
process(msg)  # Crash here → message committed but not processed
```

**Fix:**
```python
consumer = Consumer({
    'enable.auto.commit': False,  # Manual commit
})

msg = consumer.poll()
try:
    process(msg)
    consumer.commit()  # Only after successful processing
except Exception:
    pass  # Don't commit - will reprocess
```

---

### 6. Blocking Poll Too Long

**Problem:**
```python
consumer = Consumer({
    'max.poll.interval.ms': 300000,  # 5 min
})

while True:
    msg = consumer.poll(1.0)
    heavy_processing(msg)  # Takes 10 minutes → kicked from group
```

**Fix:**
```python
# Option 1: Increase timeout
'max.poll.interval.ms': 900000  # 15 min

# Option 2: Process in background
async def process_batch():
    while True:
        msg = await loop.run_in_executor(None, consumer.poll, 1.0)
        asyncio.create_task(process_async(msg))
```

---

### 7. Not Handling Rebalance

**Problem:**
```python
consumer.subscribe(['orders'])
# No rebalance callback
# Partitions revoked → uncommitted offsets lost → duplicates
```

**Fix:**
```python
def on_revoke(consumer, partitions):
    consumer.commit()  # Commit before losing partitions

consumer.subscribe(['orders'], on_revoke=on_revoke)
```

---

### 8. Single Consumer for Many Partitions

**Problem:**
```python
# Topic has 100 partitions
# Only 1 consumer → can't keep up
```

**Fix:**
```python
# Scale consumers to match partitions
# Or at least: consumers <= partitions
```

---

## Schema Gotchas

### 9. Breaking Schema Changes

**Problem:**
```json
// Version 1
{"name": "order_id", "type": "string"}

// Version 2 - BREAKS CONSUMERS
{"name": "orderId", "type": "string"}  // Renamed field
```

**Fix:**
```json
// Add new field, keep old
{"name": "order_id", "type": "string"}
{"name": "orderId", "type": ["null", "string"], "default": null}
// Migrate consumers, then deprecate old field
```

---

### 10. No Default Values

**Problem:**
```json
// Add required field
{"name": "new_field", "type": "string"}  // No default
// Old messages can't be read → consumer crashes
```

**Fix:**
```json
{"name": "new_field", "type": "string", "default": ""}
// Or make optional
{"name": "new_field", "type": ["null", "string"], "default": null}
```

---

### 11. Wrong Compatibility Level

**Problem:**
```bash
# BACKWARD compatibility
# Add field without default → rejected
# But you want producers to add field freely
```

**Fix:**
```bash
# Use FORWARD for producer flexibility
# Use FULL for maximum safety
curl -X PUT -H "Content-Type: application/json" \
  --data '{"compatibility": "FULL"}' \
  http://localhost:8081/config/orders-value
```

---

## Configuration Gotchas

### 12. Wrong Partition Count

**Problem:**
```yaml
# Too few partitions
partitions: 3  # Only 3 consumers max
# Can't increase parallelism
```

**Fix:**
```yaml
# Start with more partitions than you need
partitions: 12  # Or more based on expected throughput

# Note: Can't reduce partitions, only increase
```

---

### 13. Short Retention

**Problem:**
```yaml
retention.ms: 3600000  # 1 hour
# Consumer down for 2 hours → misses messages
```

**Fix:**
```yaml
retention.ms: 604800000  # 7 days (default)
# Or use compaction for state topics
cleanup.policy: compact
```

---

### 14. Low Replication Factor

**Problem:**
```yaml
replication-factor: 1  # Single copy
# Broker dies → data lost
```

**Fix:**
```yaml
replication-factor: 3  # Survives 2 failures
min.insync.replicas: 2  # Require 2 replicas for writes
```

---

## Strimzi/K8s Gotchas

### 15. Wrong Bootstrap Server

**Problem:**
```python
'bootstrap.servers': 'kafka:9092'  # Wrong!
# Should use Strimzi service name
```

**Fix:**
```python
'bootstrap.servers': 'my-cluster-kafka-bootstrap.kafka:9092'
# Or from same namespace:
'bootstrap.servers': 'my-cluster-kafka-bootstrap:9092'
```

---

### 16. Hardcoded Bootstrap Servers

**Problem:**
```python
# Code only works in one environment
consumer = Consumer({
    'bootstrap.servers': 'dev-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092',
})
# Run on Mac → "Failed to resolve"
# Run in K8s → works
```

**Fix:** Use environment variable with sensible default:
```python
import os

consumer = Consumer({
    'bootstrap.servers': os.environ.get(
        'KAFKA_BOOTSTRAP_SERVERS',
        'localhost:30092'  # Default for local dev
    ),
})
```

**Deployment:**
```bash
# Local (uses default)
python consumer.py

# Kubernetes (set in ConfigMap or Pod env)
KAFKA_BOOTSTRAP_SERVERS=dev-cluster-kafka-bootstrap:9092 python consumer.py
```

---

### 17. Port-Forward Doesn't Work for Local Development

**Problem:**
```bash
# Try to connect from local machine
kubectl port-forward svc/dev-cluster-kafka-bootstrap 9092:9092 -n kafka

# Then in Python
producer = Producer({'bootstrap.servers': 'localhost:9092'})
# FAILS! Broker advertises internal K8s DNS, client can't resolve
```

**Why:** Kafka clients fetch broker addresses from the cluster. Even with port-forward, the broker advertises `dev-cluster-kafka-0.dev-cluster-kafka-brokers.kafka.svc:9092` which your local machine can't resolve.

**Fix:** Use NodePort listener with `advertisedHost` on brokers (NOT on bootstrap):
```yaml
listeners:
  - name: external
    port: 9094
    type: nodeport
    tls: false
    configuration:
      bootstrap:
        nodePort: 30092
        # NOTE: advertisedHost NOT supported on bootstrap level
      brokers:
        - broker: 0
          nodePort: 30093
          advertisedHost: localhost    # CRITICAL for Docker Desktop
          advertisedPort: 30093
```

Then connect with:
```python
'bootstrap.servers': 'localhost:30092'  # Works!
```

| Location | Port | Why |
|----------|------|-----|
| Local machine | `30092` | NodePort + advertisedHost |
| Inside K8s pod | `9092` | Internal service works |

---

### 17b. Docker Desktop VM IP Unreachable

**Problem:**
```
Connect to ipv4#192.168.65.3:30093 failed: Network is unreachable
```

**Why:** Without `advertisedHost: localhost`, the broker advertises Docker Desktop's internal VM IP (`192.168.65.3`). Your Mac can't reach this IP directly.

**Fix:** Add `advertisedHost: localhost` and `advertisedPort` to broker configs (see above). Note: Strimzi only supports these on brokers, not on bootstrap.

**Verify fix:**
```bash
kubectl exec -n kafka <pod-name> -- cat /tmp/strimzi.properties | grep advertised
# Should show localhost, not 192.168.65.3
```

---

### 18. Missing Network Policy

**Problem:**
```yaml
# No network policy
# Any pod can access Kafka → security risk
```

**Fix:**
```yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: kafka-access
spec:
  podSelector:
    matchLabels:
      strimzi.io/cluster: my-cluster
  ingress:
    - from:
        - podSelector:
            matchLabels:
              kafka-access: "true"
```

---

### 19. Ignoring Resource Limits

**Problem:**
```yaml
# No limits → OOM kills, resource starvation
resources: {}
```

**Fix:**
```yaml
resources:
  requests:
    memory: 4Gi
    cpu: 1
  limits:
    memory: 8Gi
    cpu: 2
```

---

## Async/FastAPI Gotchas

### 20. Not Closing Consumer on Shutdown

**Problem:**
```python
# App shutdown → consumer still in group
# Group rebalances slowly
```

**Fix:**
```python
@asynccontextmanager
async def lifespan(app):
    yield
    consumer.close()  # Leave group cleanly
```

---

### 21. Shared Consumer Across Requests

**Problem:**
```python
# Global consumer used by multiple requests
# poll() not thread-safe → corruption
```

**Fix:**
```python
# One consumer per background task
# Or use async consumer wrapper with proper locking
```

---

## Prevention Checklist

```markdown
## Producer
- [ ] acks=all
- [ ] enable.idempotence=true
- [ ] Delivery callbacks
- [ ] flush() before shutdown
- [ ] Error handling for BufferError

## Consumer
- [ ] enable.auto.commit=false
- [ ] Commit after processing
- [ ] Rebalance callback
- [ ] Appropriate max.poll.interval.ms
- [ ] close() on shutdown

## Schema
- [ ] Default values on new fields
- [ ] Compatibility level set
- [ ] Schema validated before deploy

## Operations
- [ ] replication-factor >= 3
- [ ] min.insync.replicas >= 2
- [ ] Retention appropriate
- [ ] Resource limits set
- [ ] Monitoring enabled
```

```

### references/security-patterns.md

```markdown
# Security Patterns

Authentication and encryption configurations per environment.

---

## Security Progression

```
Development         Staging             Production
    │                  │                    │
    ▼                  ▼                    ▼
No Auth (plain) → SASL/SCRAM-512 → mTLS (certificates)
No TLS          → TLS (one-way)  → TLS (mutual)
```

---

## Development (No Auth)

Simple setup for local development.

### Strimzi Kafka

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: dev-cluster
spec:
  kafka:
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false  # No encryption
        # No authentication
```

### Client Config

```python
producer = Producer({
    'bootstrap.servers': 'kafka:9092',
})
```

---

## Staging (SASL/SCRAM)

Username/password authentication with TLS encryption.

### Strimzi Kafka

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: staging-cluster
spec:
  kafka:
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: scram-sha-512
```

### Create User

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: order-service
  labels:
    strimzi.io/cluster: staging-cluster
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: orders
          patternType: prefix
        operations:
          - Read
          - Write
          - Describe
      - resource:
          type: group
          name: order-service
          patternType: literal
        operations:
          - Read
```

### Get Credentials

```bash
# Get password from secret
kubectl get secret order-service -n kafka \
  -o jsonpath='{.data.password}' | base64 -d

# Get JAAS config
kubectl get secret order-service -n kafka \
  -o jsonpath='{.data.sasl\.jaas\.config}' | base64 -d
```

### Client Config

```python
producer = Producer({
    'bootstrap.servers': 'kafka:9093',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': 'order-service',
    'sasl.password': os.environ['KAFKA_PASSWORD'],
    'ssl.ca.location': '/certs/ca.crt',
})
```

### Environment Variables

```yaml
# Kubernetes Deployment
env:
  - name: KAFKA_PASSWORD
    valueFrom:
      secretKeyRef:
        name: order-service
        key: password
  - name: KAFKA_BOOTSTRAP_SERVERS
    value: staging-cluster-kafka-bootstrap:9093
```

---

## Production (mTLS)

Mutual TLS with client certificates.

### Strimzi Kafka

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: prod-cluster
spec:
  kafka:
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls  # Client certificate auth
```

### Create User (TLS)

```yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: payment-service
  labels:
    strimzi.io/cluster: prod-cluster
spec:
  authentication:
    type: tls  # Certificate-based
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: payments
          patternType: prefix
        operations:
          - All
```

### Get Certificates

```bash
# Strimzi creates secrets automatically:
# - payment-service (user cert + key)
# - prod-cluster-cluster-ca-cert (CA cert)

# Extract to files
kubectl get secret payment-service -n kafka \
  -o jsonpath='{.data.user\.crt}' | base64 -d > user.crt
kubectl get secret payment-service -n kafka \
  -o jsonpath='{.data.user\.key}' | base64 -d > user.key
kubectl get secret prod-cluster-cluster-ca-cert -n kafka \
  -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt
```

### Client Config

```python
producer = Producer({
    'bootstrap.servers': 'prod-cluster-kafka-bootstrap:9093',
    'security.protocol': 'SSL',
    'ssl.ca.location': '/certs/ca.crt',
    'ssl.certificate.location': '/certs/user.crt',
    'ssl.key.location': '/certs/user.key',
    # Optional: key password
    'ssl.key.password': os.environ.get('SSL_KEY_PASSWORD'),
})
```

### Mount Certificates

```yaml
# Kubernetes Deployment
spec:
  containers:
    - name: app
      volumeMounts:
        - name: kafka-certs
          mountPath: /certs
          readOnly: true
  volumes:
    - name: kafka-certs
      projected:
        sources:
          - secret:
              name: payment-service
              items:
                - key: user.crt
                  path: user.crt
                - key: user.key
                  path: user.key
          - secret:
              name: prod-cluster-cluster-ca-cert
              items:
                - key: ca.crt
                  path: ca.crt
```

---

## ACL Patterns

### Service Account Pattern

```yaml
# One user per service, scoped permissions
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: order-service
spec:
  authorization:
    type: simple
    acls:
      # Produce to own topics
      - resource:
          type: topic
          name: orders.
          patternType: prefix
        operations: [Write, Describe]

      # Consume from own + shared topics
      - resource:
          type: topic
          name: orders.
          patternType: prefix
        operations: [Read, Describe]
      - resource:
          type: topic
          name: shared.notifications
          patternType: literal
        operations: [Read, Describe]

      # Own consumer group
      - resource:
          type: group
          name: order-service
          patternType: prefix
        operations: [Read]
```

### Producer-Only

```yaml
acls:
  - resource:
      type: topic
      name: events
      patternType: literal
    operations: [Write, Describe]
  - resource:
      type: topic
      name: events
      patternType: literal
    operations: [Create]  # For auto-topic creation
```

### Consumer-Only

```yaml
acls:
  - resource:
      type: topic
      name: events
      patternType: literal
    operations: [Read, Describe]
  - resource:
      type: group
      name: my-consumer-group
      patternType: literal
    operations: [Read]
```

### Admin

```yaml
acls:
  - resource:
      type: topic
      name: "*"
      patternType: literal
    operations: [All]
  - resource:
      type: group
      name: "*"
      patternType: literal
    operations: [All]
  - resource:
      type: cluster
      patternType: literal
    operations: [All]
```

---

## Secrets Management

### External Secrets Operator

```yaml
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
  name: kafka-credentials
spec:
  secretStoreRef:
    name: vault-backend
    kind: ClusterSecretStore
  target:
    name: kafka-credentials
    template:
      data:
        username: "{{ .username }}"
        password: "{{ .password }}"
  data:
    - secretKey: username
      remoteRef:
        key: kafka/order-service
        property: username
    - secretKey: password
      remoteRef:
        key: kafka/order-service
        property: password
```

### Sealed Secrets

```yaml
apiVersion: bitnami.com/v1alpha1
kind: SealedSecret
metadata:
  name: kafka-credentials
spec:
  encryptedData:
    password: AgBy8hC...encrypted...
```

---

## Configuration Matrix

| Setting | Dev | Staging | Prod |
|---------|-----|---------|------|
| `security.protocol` | `PLAINTEXT` | `SASL_SSL` | `SSL` |
| `sasl.mechanism` | - | `SCRAM-SHA-512` | - |
| `ssl.ca.location` | - | `/certs/ca.crt` | `/certs/ca.crt` |
| `ssl.certificate.location` | - | - | `/certs/user.crt` |
| `ssl.key.location` | - | - | `/certs/user.key` |

---

## Network Policies

```yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: kafka-access
  namespace: kafka
spec:
  podSelector:
    matchLabels:
      strimzi.io/cluster: prod-cluster
  policyTypes:
    - Ingress
  ingress:
    - from:
        - namespaceSelector:
            matchLabels:
              kafka-access: "true"
        - podSelector:
            matchLabels:
              kafka-client: "true"
      ports:
        - port: 9093
          protocol: TCP
```

---

## Troubleshooting

| Error | Cause | Fix |
|-------|-------|-----|
| `SASL authentication failed` | Wrong credentials | Check secret values |
| `SSL handshake failed` | Certificate mismatch | Verify CA cert |
| `Not authorized` | Missing ACL | Add required permissions |
| `Connection refused` | Wrong port/protocol | Match listener config |
| `Certificate expired` | Strimzi cert rotation | Restart clients |

```