Back to skills
SkillHub ClubShip Full StackFull StackIntegration

django-celery-expert

Expert Django and Celery guidance for asynchronous task processing. Use when designing background tasks, configuring workers, handling retries and errors, optimizing task performance, implementing periodic tasks, or setting up production monitoring. Follows Celery best practices with Django integration patterns.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
28
Hot score
89
Updated
March 20, 2026
Overall rating
C2.4
Composite score
2.4
Best-practice grade
B73.6

Install command

npx @skill-hub/cli install vintasoftware-django-ai-plugins-skills

Repository

vintasoftware/django-ai-plugins

Skill path: plugins/django-celery-expert/skills

Expert Django and Celery guidance for asynchronous task processing. Use when designing background tasks, configuring workers, handling retries and errors, optimizing task performance, implementing periodic tasks, or setting up production monitoring. Follows Celery best practices with Django integration patterns.

Open repository

Best for

Primary workflow: Ship Full Stack.

Technical facets: Full Stack, Integration.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: vintasoftware.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install django-celery-expert into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/vintasoftware/django-ai-plugins before adding django-celery-expert to shared team environments
  • Use django-celery-expert for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: django-celery-expert
description: Expert Django and Celery guidance for asynchronous task processing. Use when designing background tasks, configuring workers, handling retries and errors, optimizing task performance, implementing periodic tasks, or setting up production monitoring. Follows Celery best practices with Django integration patterns.
---

# Django Celery Expert

## Overview

This skill provides expert guidance for Django applications using Celery for asynchronous task processing. It covers task design patterns, worker configuration, error handling, monitoring, and production deployment strategies.

**Key Capabilities:**
- Task design and implementation patterns
- Worker configuration and scaling
- Error handling and retry strategies
- Periodic/scheduled task management
- Monitoring and observability
- Production deployment best practices

## When to Use

Invoke this skill when you encounter these triggers:

**Task Design:**
- "Create a Celery task for..."
- "Move this to a background job"
- "Process this asynchronously"
- "Handle this outside the request"

**Configuration & Setup:**
- "Configure Celery for Django"
- "Set up task queues"
- "Configure Celery workers"
- "Set up Celery Beat for scheduling"

**Error Handling:**
- "Handle task failures"
- "Implement retry logic"
- "Task keeps failing"
- "Set up dead letter queue"

**Performance & Scaling:**
- "Scale Celery workers"
- "Optimize task throughput"
- "Tasks are too slow"
- "Handle high task volume"

**Monitoring:**
- "Monitor Celery tasks"
- "Set up Flower"
- "Track task progress"
- "Debug stuck tasks"

## Instructions

Follow this workflow when handling Django Celery requests:

### 1. Analyze the Request

**Identify the task type:**
- Simple background task (fire-and-forget)
- Task with result tracking (need to poll for completion)
- Chained/grouped tasks (workflow orchestration)
- Periodic/scheduled tasks (cron-like behavior)
- Long-running tasks (need progress tracking)

**Key questions:**
- Does the caller need the result?
- Should failures be retried?
- Is idempotency required?
- What's the expected execution time?
- How critical is guaranteed execution?

### 2. Load Relevant Reference Documentation

Based on the task type, reference the appropriate bundled documentation:

- **Django-specific patterns** -> `references/django-integration.md`
- **Task implementation** -> `references/task-design-patterns.md`
- **Configuration & setup** -> `references/configuration-guide.md`
- **Error handling & retries** -> `references/error-handling.md`
- **Periodic tasks** -> `references/periodic-tasks.md`
- **Monitoring & debugging** -> `references/monitoring-observability.md`
- **Production deployment** -> `references/production-deployment.md`

### 3. Implement Following Best Practices

**Task design principles:**
- Keep tasks small and focused
- Design for idempotency when possible
- Use explicit task names
- Bind tasks for access to self
- Pass serializable arguments only (IDs, not objects)

**Error handling:**
- Configure appropriate retry behavior
- Use exponential backoff
- Set max retry limits
- Handle specific exceptions appropriately
- Log failures with context

**Performance:**
- Use appropriate serializers (JSON for safety, pickle for Python objects)
- Configure prefetch limits
- Use task routing for prioritization
- Batch operations when appropriate
- Monitor memory usage

### 4. Validate Implementation

Before presenting the solution:
- Verify task is idempotent if retries enabled
- Check serialization of arguments
- Ensure proper error handling
- Verify monitoring/logging is in place
- Consider failure scenarios

## Bundled Resources

**references/** - Comprehensive Celery documentation loaded into context as needed

- **`references/django-integration.md`**
  - transaction.on_commit() for safe task queuing
  - Database as source of truth with recovery tasks
  - Request-task correlation with django-guid
  - Testing Django Celery tasks

- **`references/task-design-patterns.md`**
  - Task signatures and calling patterns
  - Binding and accessing task properties
  - Task inheritance and base classes
  - Workflow patterns (chains, groups, chords)
  - Idempotency and exactly-once delivery

- **`references/configuration-guide.md`**
  - Django-Celery integration setup
  - Broker configuration (Redis, RabbitMQ)
  - Result backend options
  - Worker settings and concurrency
  - Task routing and queues

- **`references/error-handling.md`**
  - Retry strategies and backoff
  - Exception handling patterns
  - Dead letter queues
  - Task rejection and requeue
  - Timeout handling

- **`references/periodic-tasks.md`**
  - Celery Beat configuration
  - Crontab and interval schedules
  - Django database scheduler
  - Dynamic schedule management
  - Timezone considerations

- **`references/monitoring-observability.md`**
  - Flower setup and usage
  - Prometheus/Grafana integration
  - Task event monitoring
  - Logging best practices
  - Debugging stuck tasks

- **`references/production-deployment.md`**
  - Worker deployment patterns
  - Process supervision (systemd, supervisor)
  - Containerized deployments
  - Scaling strategies
  - Health checks and graceful shutdown

## Examples

### Example 1: Basic Background Task

**User Request:**
> "Send welcome emails in the background after user registration"

**Implementation:**
```python
# tasks.py
from celery import shared_task
from django.core.mail import send_mail

@shared_task(bind=True, max_retries=3)
def send_welcome_email(self, user_id):
    from users.models import User

    try:
        user = User.objects.get(id=user_id)
        send_mail(
            subject="Welcome!",
            message=f"Hi {user.name}, welcome to our platform!",
            from_email="[email protected]",
            recipient_list=[user.email],
        )
    except User.DoesNotExist:
        pass  # User deleted, skip
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

# views.py
def register(request):
    user = User.objects.create(...)
    send_welcome_email.delay(user.id)  # Fire and forget
    return redirect('dashboard')
```

### Example 2: Task with Progress Tracking

**User Request:**
> "Process a large CSV import with progress updates"

**Implementation:**
```python
@shared_task(bind=True)
def import_csv(self, file_path, total_rows):
    from myapp.models import Record

    with open(file_path) as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader):
            Record.objects.create(**row)
            if i % 100 == 0:
                self.update_state(
                    state='PROGRESS',
                    meta={'current': i, 'total': total_rows}
                )

    return {'status': 'complete', 'processed': total_rows}

# Check progress
result = import_csv.AsyncResult(task_id)
if result.state == 'PROGRESS':
    progress = result.info.get('current', 0) / result.info.get('total', 1)
```

### Example 3: Workflow with Chains

**User Request:**
> "Process an order: validate inventory, charge payment, then send confirmation"

**Implementation:**
```python
from celery import chain

@shared_task
def validate_inventory(order_id):
    # Returns order_id if valid, raises if not
    order = Order.objects.get(id=order_id)
    if not order.items_in_stock():
        raise ValueError("Items out of stock")
    return order_id

@shared_task
def charge_payment(order_id):
    order = Order.objects.get(id=order_id)
    order.charge()
    return order_id

@shared_task
def send_confirmation(order_id):
    order = Order.objects.get(id=order_id)
    order.send_confirmation_email()

def process_order(order_id):
    workflow = chain(
        validate_inventory.s(order_id),
        charge_payment.s(),
        send_confirmation.s()
    )
    workflow.delay()
```

## Additional Notes

**Common Pitfalls:**
- Passing Django model instances instead of IDs
- Not handling task idempotency with retries
- Missing timeout configuration for long tasks
- Not monitoring task queue depth
- Ignoring result backend cleanup

**Django Integration:**
- Use `django-celery-beat` for database-backed schedules
- Use `django-celery-results` for storing task results in Django
- Configure `CELERY_` settings in Django settings.py
- Use `@shared_task` for reusable apps

**Security:**
- Never pass sensitive data in task arguments
- Use signed serializers if pickle is required
- Restrict Flower access in production
- Validate task arguments


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### references/django-integration.md

```markdown
# Django Integration Patterns

Critical patterns for reliable Django and Celery integration.

**Sources:**
- [Celery in the Wild: Tips and Tricks to Run Async Tasks in the Real World](https://www.vintasoftware.com/blog/celery-wild-tips-and-tricks-run-async-tasks-real-world) - Vinta Software
- [A Guide on Django Celery Tasks That Actually Work](https://www.vintasoftware.com/blog/guide-django-celery-tasks) - Vinta Software

## Critical: Use transaction.on_commit()

Always use `transaction.on_commit()` when queuing tasks after database writes. This ensures the database transaction commits before the task is queued, preventing race conditions where the task runs before the data is available.

```python
from django.db import transaction

class SignUpView(CreateView):
    def form_valid(self, form):
        response = super().form_valid(form)
        user_pk = self.object.pk

        # GOOD: Task queued only after transaction commits
        transaction.on_commit(
            lambda: send_activation_email.delay(user_pk)
        )
        return response

# BAD: Task might run before user is committed to DB
def bad_signup(request):
    user = User.objects.create(...)
    send_activation_email.delay(user.pk)  # Race condition!
    return redirect('home')
```

### With Atomic Blocks

```python
from django.db import transaction

def process_order(order_id):
    with transaction.atomic():
        order = Order.objects.select_for_update().get(id=order_id)
        order.status = 'processing'
        order.save()

        # Queue task only after atomic block commits
        transaction.on_commit(
            lambda: notify_warehouse.delay(order.pk)
        )
```

### Nested Transactions

```python
from django.db import transaction

def complex_operation():
    with transaction.atomic():
        create_parent_record()

        with transaction.atomic():
            create_child_record()
            # This on_commit waits for OUTER transaction
            transaction.on_commit(lambda: child_task.delay())

        # This also waits for outer transaction
        transaction.on_commit(lambda: parent_task.delay())
```

## Database as Source of Truth

Don't rely solely on Celery for guaranteed delivery. Use the database as the source of truth and implement recovery tasks.

### Pattern: Flag-Based Recovery

```python
# models.py
class User(models.Model):
    email = models.EmailField()
    is_activation_email_sent = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)

# tasks.py
@shared_task(bind=True, max_retries=3)
def send_activation_email(self, user_pk):
    try:
        user = User.objects.get(pk=user_pk)
        if user.is_activation_email_sent:
            return  # Already sent, idempotent

        send_email(user.email, 'Activate your account')

        # Mark as sent atomically
        User.objects.filter(pk=user_pk).update(is_activation_email_sent=True)

    except User.DoesNotExist:
        pass  # User deleted, skip
    except EmailException as exc:
        raise self.retry(exc=exc)

# Recovery task - runs periodically via Celery Beat
@shared_task
def recover_unsent_activation_emails():
    """Catch any missed activation emails."""
    cutoff = timezone.now() - timedelta(hours=24)
    unsent = User.objects.filter(
        is_activation_email_sent=False,
        created_at__lt=timezone.now() - timedelta(minutes=5),  # Grace period
        created_at__gt=cutoff,  # Don't process very old records
    )
    for user_pk in unsent.values_list('pk', flat=True):
        send_activation_email.delay(user_pk)
```

### Celery Beat Schedule for Recovery

```python
CELERY_BEAT_SCHEDULE = {
    'recover-activation-emails': {
        'task': 'users.tasks.recover_unsent_activation_emails',
        'schedule': crontab(minute='*/15'),  # Every 15 minutes
    },
}
```

## Atomicity: External Calls Before DB Writes

Execute external API calls before modifying the database. This prevents inconsistent states where the DB is updated but the external call fails.

```python
# GOOD: External call first, then DB update
@shared_task(bind=True, max_retries=3)
def sync_user_to_crm(self, user_id):
    user = User.objects.get(id=user_id)

    try:
        # External call FIRST
        crm_response = crm_api.create_contact(
            email=user.email,
            name=user.name,
        )

        # DB update AFTER external call succeeds
        user.crm_id = crm_response['id']
        user.crm_synced_at = timezone.now()
        user.save()

    except CRMAPIError as exc:
        raise self.retry(exc=exc)

# BAD: DB updated before external call
@shared_task
def bad_sync_user_to_crm(user_id):
    user = User.objects.get(id=user_id)
    user.crm_sync_started = True
    user.save()  # DB updated

    crm_api.create_contact(...)  # This might fail!
    # Now DB is in inconsistent state
```

## Request-Task Correlation with django-guid

Use `django-guid` to trace requests through to Celery tasks for debugging.

```bash
pip install django-guid celery-guid
```

```python
# settings.py
INSTALLED_APPS = [
    ...
    'django_guid',
]

MIDDLEWARE = [
    'django_guid.middleware.guid_middleware',
    ...
]

DJANGO_GUID = {
    'GUID_HEADER_NAME': 'X-Correlation-ID',
    'VALIDATE_GUID': False,
    'RETURN_HEADER': True,
    'INTEGRATIONS': [
        'django_guid.integrations.celery.CeleryIntegration',
    ],
}

# celery.py
from celery import Celery
from django_guid.integrations.celery import CeleryIntegration

app = Celery('myproject')

# Logging format includes correlation ID
LOGGING = {
    'formatters': {
        'correlation': {
            'format': '[%(correlation_id)s] %(levelname)s %(name)s: %(message)s',
        },
    },
}
```

### Manual Correlation Without django-guid

```python
import uuid
from celery import shared_task

@shared_task(bind=True)
def task_with_correlation(self, data):
    correlation_id = self.request.headers.get('correlation_id', 'unknown')
    logger.info(f"[{correlation_id}] Processing task")
    # ... task logic

# Pass correlation ID when calling
def my_view(request):
    correlation_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4()))
    task_with_correlation.apply_async(
        args=[data],
        headers={'correlation_id': correlation_id}
    )
```

## Task-Specific Logging

Use Celery's task logger for proper task context in logs.

```python
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@shared_task(bind=True)
def process_order(self, order_id):
    logger.info(f'Starting order processing', extra={
        'order_id': order_id,
        'task_id': self.request.id,
    })

    try:
        order = Order.objects.get(id=order_id)
        # ... process order
        logger.info(f'Order processed successfully')

    except Order.DoesNotExist:
        logger.warning(f'Order not found: {order_id}')

    except Exception as exc:
        logger.exception(f'Order processing failed')
        raise
```

## Idempotent ORM Operations

Use Django ORM methods that are naturally idempotent for safe retries.

```python
@shared_task(bind=True, max_retries=5)
def update_user_stats(self, user_id, points_earned):
    try:
        # GOOD: Idempotent with get_or_create
        stats, created = UserStats.objects.get_or_create(
            user_id=user_id,
            defaults={'points': points_earned}
        )

        if not created:
            # GOOD: Idempotent with update_or_create
            UserStats.objects.update_or_create(
                user_id=user_id,
                defaults={'points': F('points') + points_earned}
            )

    except Exception as exc:
        raise self.retry(exc=exc)

# For truly idempotent operations, use unique constraints
@shared_task
def record_event(event_id, user_id, event_type):
    # Idempotency via unique constraint
    Event.objects.get_or_create(
        idempotency_key=event_id,
        defaults={
            'user_id': user_id,
            'event_type': event_type,
        }
    )
```

## Handling Model Changes in Tasks

Tasks may run with stale model data. Always fetch fresh data.

```python
# BAD: Passing model state that might be stale
@shared_task
def bad_task(user_email, user_name):
    send_email(user_email, f"Hello {user_name}")
    # What if user updated their email before task ran?

# GOOD: Fetch fresh data by ID
@shared_task
def good_task(user_id):
    user = User.objects.get(id=user_id)  # Fresh from DB
    send_email(user.email, f"Hello {user.name}")
```

## Testing Django Celery Tasks

### Unit Testing with Eager Mode

```python
# settings/test.py
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True

# test_tasks.py
from django.test import TestCase, override_settings

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
class TaskTestCase(TestCase):
    def test_send_email_task(self):
        user = User.objects.create(email='[email protected]')
        result = send_activation_email.delay(user.pk)

        # Task runs synchronously in tests
        self.assertTrue(result.successful())
        user.refresh_from_db()
        self.assertTrue(user.is_activation_email_sent)
```

### Testing with Mocked Tasks

```python
from unittest.mock import patch

class ViewTestCase(TestCase):
    @patch('myapp.tasks.send_activation_email.delay')
    def test_signup_queues_email(self, mock_task):
        response = self.client.post('/signup/', {
            'email': '[email protected]',
            'password': 'securepass123',
        })

        self.assertEqual(response.status_code, 302)
        mock_task.assert_called_once()
```

### Testing transaction.on_commit

```python
from django.test import TestCase, TransactionTestCase

class OnCommitTestCase(TransactionTestCase):
    """Use TransactionTestCase for on_commit testing."""

    @patch('myapp.tasks.send_email.delay')
    def test_email_sent_after_commit(self, mock_task):
        # Task is queued after transaction commits
        User.objects.create(email='[email protected]')

        # In TransactionTestCase, on_commit callbacks execute
        mock_task.assert_called_once()
```

```

### references/task-design-patterns.md

```markdown
# Task Design Patterns

## Task Fundamentals

### Basic Task Definition

```python
from celery import shared_task

# Simple task - use for reusable Django apps
@shared_task
def add(x, y):
    return x + y

# Bound task - access to self for retries, logging
@shared_task(bind=True)
def process_data(self, data_id):
    self.update_state(state='PROCESSING')
    # ...
```

### Task Naming

Always use explicit names for production tasks:

```python
# Explicit name - recommended for production
@shared_task(name='orders.tasks.process_order')
def process_order(order_id):
    pass

# Auto-generated name (module.function_name)
# Can break if you refactor or rename
@shared_task
def process_order(order_id):
    pass
```

### Task Signatures

```python
# Different ways to call tasks
task.delay(arg1, arg2)              # Shortcut for apply_async
task.apply_async(args=[arg1, arg2]) # Full control
task.apply_async(
    args=[arg1],
    kwargs={'key': 'value'},
    countdown=60,                    # Delay execution by 60 seconds
    eta=datetime(2024, 1, 1, 12, 0), # Execute at specific time
    expires=3600,                    # Expire if not started in 1 hour
    queue='high-priority',           # Route to specific queue
    priority=9,                      # Task priority (0-9, higher = more priority)
)

# Signature objects for building workflows
from celery import signature
sig = signature('tasks.add', args=(2, 2))
sig = add.s(2, 2)  # Shortcut
sig = add.si(2, 2) # Immutable signature (ignores return from previous task)
```

## Argument Best Practices

### Pass IDs, Not Objects

```python
# BAD - Django objects aren't JSON serializable
@shared_task
def process_order(order):  # Don't pass model instances!
    order.process()

# GOOD - Pass primary keys
@shared_task
def process_order(order_id):
    from orders.models import Order
    order = Order.objects.get(id=order_id)
    order.process()
```

### Handle Missing Objects

```python
@shared_task(bind=True)
def send_notification(self, user_id):
    from users.models import User

    try:
        user = User.objects.get(id=user_id)
    except User.DoesNotExist:
        # Object was deleted between queuing and execution
        # Log and skip - don't retry
        logger.warning(f"User {user_id} not found, skipping notification")
        return None

    user.send_notification()
```

### Serializable Arguments Only

```python
# GOOD - primitive types
@shared_task
def process(user_id, action, options=None):
    pass

process.delay(123, 'activate', {'notify': True})

# BAD - non-serializable types
@shared_task
def process(user, callback_func):  # Can't serialize!
    pass
```

## Idempotency

### Why Idempotency Matters

Tasks may execute multiple times due to:
- Network issues causing duplicate messages
- Worker crashes mid-execution followed by retry
- Manual task replay for debugging

### Idempotent Task Patterns

```python
# BAD - Not idempotent, will increment multiple times on retry
@shared_task
def increment_counter(counter_id):
    counter = Counter.objects.get(id=counter_id)
    counter.value += 1
    counter.save()

# GOOD - Idempotent using state check
@shared_task
def increment_counter(counter_id, expected_value):
    Counter.objects.filter(
        id=counter_id,
        value=expected_value  # Only update if value hasn't changed
    ).update(value=F('value') + 1)

# GOOD - Idempotent using unique constraint
@shared_task
def create_order_item(order_id, product_id, quantity, idempotency_key):
    OrderItem.objects.get_or_create(
        idempotency_key=idempotency_key,
        defaults={
            'order_id': order_id,
            'product_id': product_id,
            'quantity': quantity,
        }
    )
```

### Deduplication with Locks

```python
from django.core.cache import cache
from contextlib import contextmanager

@contextmanager
def task_lock(lock_id, timeout=60*10):
    lock_acquired = cache.add(lock_id, 'locked', timeout)
    try:
        yield lock_acquired
    finally:
        if lock_acquired:
            cache.delete(lock_id)

@shared_task(bind=True)
def deduplicated_task(self, resource_id):
    lock_id = f'task-lock-{resource_id}'

    with task_lock(lock_id) as acquired:
        if not acquired:
            # Another task is processing this resource
            logger.info(f"Task for {resource_id} already running, skipping")
            return

        # Safe to process
        process_resource(resource_id)
```

## Workflow Patterns

### Chains - Sequential Execution

```python
from celery import chain

# Each task passes its result to the next
workflow = chain(
    fetch_data.s(url),        # Returns data
    parse_data.s(),           # Receives data, returns parsed
    store_data.s(),           # Receives parsed, stores
    notify_complete.si()      # si() = immutable, ignores input
)
result = workflow.delay()
```

### Groups - Parallel Execution

```python
from celery import group

# Execute tasks in parallel
parallel = group(
    process_item.s(item_id) for item_id in item_ids
)
result = parallel.delay()

# Get all results (blocks until all complete)
results = result.get()
```

### Chords - Parallel with Callback

```python
from celery import chord

# Run tasks in parallel, then call callback with all results
workflow = chord(
    [process_item.s(item_id) for item_id in item_ids],
    aggregate_results.s()  # Called with list of all results
)
result = workflow.delay()
```

### Complex Workflows

```python
from celery import chain, group, chord

# Example: Process order with parallel item processing
workflow = chain(
    validate_order.s(order_id),
    chord(
        [process_item.s(item_id) for item_id in item_ids],
        finalize_order.s(order_id)
    ),
    send_confirmation.s()
)
```

## Task Inheritance

### Base Task Classes

```python
from celery import Task

class DatabaseTask(Task):
    """Base task that ensures database connection."""

    def __call__(self, *args, **kwargs):
        from django.db import connection
        connection.ensure_connection()
        return super().__call__(*args, **kwargs)


class LoggedTask(Task):
    """Base task with automatic logging."""

    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f'Task {self.name}[{task_id}] succeeded')

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.error(f'Task {self.name}[{task_id}] failed: {exc}')

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.warning(f'Task {self.name}[{task_id}] retrying: {exc}')


@shared_task(base=LoggedTask, bind=True)
def my_logged_task(self, data):
    pass
```

### Abstract Base Tasks

```python
from celery import shared_task

class NotificationTaskMixin:
    """Mixin for notification tasks."""
    max_retries = 3
    default_retry_delay = 60

    def get_user(self, user_id):
        from users.models import User
        return User.objects.get(id=user_id)


class EmailNotificationTask(NotificationTaskMixin, Task):
    pass


@shared_task(base=EmailNotificationTask, bind=True)
def send_email_notification(self, user_id, template):
    user = self.get_user(user_id)
    # ...
```

## Task Context and Metadata

### Accessing Task Properties

```python
@shared_task(bind=True)
def task_with_context(self, data):
    # Task metadata
    task_id = self.request.id
    task_name = self.name
    retries = self.request.retries

    # Execution context
    hostname = self.request.hostname
    delivery_info = self.request.delivery_info
    called_directly = self.request.called_directly

    # Parent task (if called from another task)
    parent_id = self.request.parent_id
    root_id = self.request.root_id  # Original task that started the chain

    # Headers (custom metadata)
    custom_header = self.request.headers.get('my_header')
```

### Passing Context Through Chains

```python
# Using headers for context that persists through chain
@shared_task(bind=True)
def step_one(self, data):
    correlation_id = self.request.headers.get('correlation_id')
    logger.info(f"[{correlation_id}] Processing step one")
    return data

# Apply with headers
chain(
    step_one.s(data),
    step_two.s(),
).apply_async(headers={'correlation_id': 'abc-123'})
```

## Long-Running Tasks

### Progress Tracking

```python
@shared_task(bind=True)
def long_running_task(self, items):
    total = len(items)

    for i, item in enumerate(items):
        process_item(item)

        # Update progress
        self.update_state(
            state='PROGRESS',
            meta={
                'current': i + 1,
                'total': total,
                'percent': int((i + 1) / total * 100)
            }
        )

    return {'status': 'complete', 'processed': total}

# Check progress from Django view
def check_progress(request, task_id):
    result = long_running_task.AsyncResult(task_id)

    if result.state == 'PENDING':
        response = {'state': 'PENDING', 'progress': 0}
    elif result.state == 'PROGRESS':
        response = {
            'state': 'PROGRESS',
            'progress': result.info.get('percent', 0)
        }
    elif result.state == 'SUCCESS':
        response = {'state': 'SUCCESS', 'result': result.result}
    else:
        response = {'state': result.state, 'error': str(result.info)}

    return JsonResponse(response)
```

### Chunking Large Datasets

```python
from celery import chord, group

@shared_task
def process_chunk(item_ids):
    items = Item.objects.filter(id__in=item_ids)
    for item in items:
        item.process()
    return len(item_ids)

@shared_task
def aggregate_results(results):
    total = sum(results)
    logger.info(f"Processed {total} items total")
    return total

def process_all_items():
    all_ids = list(Item.objects.values_list('id', flat=True))
    chunk_size = 100

    # Split into chunks
    chunks = [
        all_ids[i:i + chunk_size]
        for i in range(0, len(all_ids), chunk_size)
    ]

    # Process chunks in parallel, aggregate at end
    workflow = chord(
        [process_chunk.s(chunk) for chunk in chunks],
        aggregate_results.s()
    )
    return workflow.delay()
```

## Task Revocation

### Revoking Tasks

```python
from celery.result import AsyncResult

# Revoke a single task
result = my_task.delay(data)
result.revoke()

# Revoke with termination (kills running task)
result.revoke(terminate=True)

# Revoke by task ID
from myproject.celery import app
app.control.revoke(task_id, terminate=True)

# Revoke multiple tasks
app.control.revoke([task_id1, task_id2])
```

### Handling Revocation in Tasks

```python
from celery.exceptions import Terminated

@shared_task(bind=True)
def cancelable_task(self, items):
    for item in items:
        # Check if task was revoked
        if self.is_aborted():
            logger.info("Task was revoked, cleaning up")
            cleanup()
            return

        process_item(item)
```

## Rate Limiting

### Task-Level Rate Limits

```python
# Limit to 10 tasks per minute
@shared_task(rate_limit='10/m')
def rate_limited_task(data):
    pass

# Limit to 100 tasks per hour
@shared_task(rate_limit='100/h')
def hourly_limited_task(data):
    pass

# Limit to 1 task per second
@shared_task(rate_limit='1/s')
def slow_task(data):
    pass
```

### Dynamic Rate Limiting

```python
from celery import shared_task
from time import sleep

@shared_task(bind=True)
def api_call_task(self, endpoint):
    try:
        response = call_api(endpoint)
        return response
    except RateLimitError as exc:
        # Back off and retry
        retry_after = exc.retry_after or 60
        raise self.retry(exc=exc, countdown=retry_after)
```

```

### references/configuration-guide.md

```markdown
# Configuration Guide

## Django-Celery Integration Setup

### Project Structure

```
myproject/
├── myproject/
│   ├── __init__.py
│   ├── celery.py      # Celery app configuration
│   ├── settings.py
│   └── urls.py
├── myapp/
│   ├── __init__.py
│   ├── models.py
│   └── tasks.py       # App-specific tasks
└── manage.py
```

### Celery Application Setup

```python
# myproject/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# Load config from Django settings with CELERY_ prefix
app.config_from_object('django.conf:settings', namespace='CELERY')

# Auto-discover tasks in all installed apps
app.autodiscover_tasks()

@app.task(bind=True, ignore_result=True)
def debug_task(self):
    print(f'Request: {self.request!r}')
```

```python
# myproject/__init__.py
from .celery import app as celery_app

__all__ = ('celery_app',)
```

### Django Settings

```python
# settings.py

# Broker Configuration (choose one)
# Redis
CELERY_BROKER_URL = 'redis://localhost:6379/0'

# RabbitMQ
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'

# Result Backend (optional, needed if you track results)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# Or use Django database
CELERY_RESULT_BACKEND = 'django-db'

# Serialization
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']

# Timezone
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True

# Task settings
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60  # 30 minutes hard limit
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60  # 25 minutes soft limit
```

## Broker Configuration

### Redis Broker

```python
# Basic Redis
CELERY_BROKER_URL = 'redis://localhost:6379/0'

# Redis with password
CELERY_BROKER_URL = 'redis://:password@localhost:6379/0'

# Redis Sentinel for HA
CELERY_BROKER_URL = 'sentinel://localhost:26379'
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'master_name': 'mymaster',
    'sentinel_kwargs': {'password': 'sentinel_password'},
}

# Redis Cluster
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'cluster': True,
}

# Connection pool settings
CELERY_BROKER_POOL_LIMIT = 10
CELERY_BROKER_CONNECTION_TIMEOUT = 10
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
```

### RabbitMQ Broker

```python
# Basic RabbitMQ
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'

# With virtual host
CELERY_BROKER_URL = 'amqp://user:password@localhost:5672/myvhost'

# RabbitMQ with SSL
CELERY_BROKER_URL = 'amqp://user:password@localhost:5671//'
CELERY_BROKER_USE_SSL = {
    'keyfile': '/path/to/key.pem',
    'certfile': '/path/to/cert.pem',
    'ca_certs': '/path/to/ca.pem',
    'cert_reqs': ssl.CERT_REQUIRED,
}

# Connection pool
CELERY_BROKER_POOL_LIMIT = 10
CELERY_BROKER_HEARTBEAT = 10

# Confirm message delivery (recommended for reliability)
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'confirm_publish': True,
}
```

## Broker Transport Options

Critical settings for message delivery reliability.

### Redis Visibility Timeout

When using Redis, set `visibility_timeout` to control how long a task remains invisible after being picked up. If a worker crashes before acknowledging, the task becomes visible again after this timeout.

```python
# Redis visibility timeout (default: 1 hour)
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'visibility_timeout': 3600,  # 1 hour in seconds
}
```

**Important:** Set visibility timeout longer than your longest task. If a task takes longer than the timeout, it will be redelivered to another worker while still running.

### RabbitMQ Publisher Confirms

Enable publisher confirms to ensure messages are actually delivered to the broker.

```python
# RabbitMQ: confirm message delivery
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'confirm_publish': True,
}
```

### SQS Configuration

```python
# AWS SQS
CELERY_BROKER_URL = 'sqs://aws_access_key:aws_secret_key@'
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'region': 'us-east-1',
    'visibility_timeout': 3600,
    'polling_interval': 1,
    'queue_name_prefix': 'celery-',
}
```

## Separating Redis Instances

**Critical:** Use separate Redis instances for different purposes to prevent cascade failures.

```python
# BAD: Single Redis for everything
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CACHES = {'default': {'BACKEND': '...', 'LOCATION': 'redis://localhost:6379/0'}}

# GOOD: Separate Redis instances/databases
CELERY_BROKER_URL = 'redis://localhost:6379/0'      # Task queue
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'  # Results
CACHES = {
    'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': 'redis://localhost:6379/2',     # Cache
    }
}

# BEST: Separate Redis servers in production
CELERY_BROKER_URL = 'redis://redis-queue:6379/0'
CELERY_RESULT_BACKEND = 'redis://redis-results:6379/0'
CACHES = {
    'default': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': 'redis://redis-cache:6379/0',
    }
}
```

Why separate?
- Cache eviction won't affect task queue
- Result backend memory pressure won't block new tasks
- Easier to scale each component independently
- Isolates failures

## Result Backend Configuration

### Redis Result Backend

```python
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

# Result expiration (default: 1 day)
CELERY_RESULT_EXPIRES = 60 * 60 * 24  # 24 hours

# Extended result info
CELERY_RESULT_EXTENDED = True
```

### Django Database Result Backend

```python
# Install: pip install django-celery-results
INSTALLED_APPS = [
    ...
    'django_celery_results',
]

CELERY_RESULT_BACKEND = 'django-db'

# Run migrations
# python manage.py migrate django_celery_results

# Optional: Cache results in addition to database
CELERY_CACHE_BACKEND = 'django-cache'
```

### Disable Results (Fire-and-Forget)

```python
# Global: disable result tracking
CELERY_TASK_IGNORE_RESULT = True

# Per-task: disable result
@shared_task(ignore_result=True)
def fire_and_forget_task(data):
    pass
```

## Worker Configuration

### Concurrency Settings

```python
# Number of worker processes/threads
CELERY_WORKER_CONCURRENCY = 4  # Default: CPU count

# Prefetch multiplier (tasks to prefetch per worker)
CELERY_WORKER_PREFETCH_MULTIPLIER = 1  # For fair distribution
# Use 4 (default) for throughput, 1 for latency-sensitive tasks

# Max tasks per worker before restart (memory leak prevention)
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
```

### Task Execution Settings

```python
# Task time limits
CELERY_TASK_TIME_LIMIT = 300  # Hard kill after 5 min
CELERY_TASK_SOFT_TIME_LIMIT = 240  # Raise exception after 4 min

# Task acknowledgment
CELERY_TASK_ACKS_LATE = True  # Ack after task completes (for reliability)
CELERY_TASK_REJECT_ON_WORKER_LOST = True  # Requeue if worker dies

# Task result compression
CELERY_RESULT_COMPRESSION = 'gzip'
```

## Task Routing and Queues

### Basic Queue Configuration

```python
# Define queues
from kombu import Queue

CELERY_TASK_QUEUES = (
    Queue('default', routing_key='default'),
    Queue('high-priority', routing_key='high'),
    Queue('low-priority', routing_key='low'),
    Queue('emails', routing_key='emails'),
)

CELERY_TASK_DEFAULT_QUEUE = 'default'
```

### Task Routing

```python
# Route tasks to specific queues
CELERY_TASK_ROUTES = {
    # By task name
    'myapp.tasks.send_email': {'queue': 'emails'},
    'myapp.tasks.critical_task': {'queue': 'high-priority'},

    # By pattern
    'myapp.tasks.report_*': {'queue': 'low-priority'},

    # By module
    'myapp.email_tasks.*': {'queue': 'emails'},
}

# Or use a function for dynamic routing
def route_task(name, args, kwargs, options, task=None, **kw):
    if 'urgent' in kwargs.get('priority', ''):
        return {'queue': 'high-priority'}
    return {'queue': 'default'}

CELERY_TASK_ROUTES = (route_task,)
```

### Running Workers for Specific Queues

```bash
# Process only high-priority queue
celery -A myproject worker -Q high-priority

# Process multiple queues with priority
celery -A myproject worker -Q high-priority,default,low-priority

# Dedicated email worker
celery -A myproject worker -Q emails -c 2 -n email-worker@%h
```

## Serialization

### Serializer Options

```python
# JSON (default, safe, recommended)
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']

# Pickle (supports Python objects, security risk!)
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']

# MessagePack (binary, faster than JSON)
# pip install msgpack
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'msgpack'
CELERY_ACCEPT_CONTENT = ['msgpack', 'json']
```

### Per-Task Serializer

```python
@shared_task(serializer='pickle')
def task_with_complex_objects(data):
    # Can receive Python objects
    pass
```

## Security Settings

### Secure Serialization

```python
# Only accept JSON (never use pickle in production with untrusted data)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# Content signing (for pickle if absolutely needed)
CELERY_TASK_SERIALIZER = 'auth'
CELERY_SECURITY_KEY = '/path/to/private.key'
CELERY_SECURITY_CERTIFICATE = '/path/to/certificate.pem'
CELERY_SECURITY_CERT_STORE = '/path/to/certs/'
```

### Rate Limiting

```python
# Global rate limit (tasks/second)
CELERY_TASK_DEFAULT_RATE_LIMIT = '100/s'

# Per-task rate limit
@shared_task(rate_limit='10/m')  # 10 per minute
def rate_limited_task():
    pass
```

## Environment-Specific Configuration

### Development Settings

```python
# settings/development.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

# Eager mode: execute tasks synchronously (for debugging)
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True
```

### Production Settings

```python
# settings/production.py
import os

CELERY_BROKER_URL = os.environ['CELERY_BROKER_URL']
CELERY_RESULT_BACKEND = os.environ['CELERY_RESULT_BACKEND']

# Never use eager mode in production
CELERY_TASK_ALWAYS_EAGER = False

# Connection resilience
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_BROKER_CONNECTION_MAX_RETRIES = 10

# Task settings
CELERY_TASK_ACKS_LATE = True
CELERY_TASK_REJECT_ON_WORKER_LOST = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

# Result expiration
CELERY_RESULT_EXPIRES = 60 * 60 * 24  # 24 hours

# Memory management
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
```

### Testing Settings

```python
# settings/testing.py
CELERY_TASK_ALWAYS_EAGER = True
CELERY_TASK_EAGER_PROPAGATES = True
CELERY_BROKER_URL = 'memory://'
CELERY_RESULT_BACKEND = 'cache+memory://'
```

## Complete Configuration Example

```python
# settings.py - Complete Celery configuration

from kombu import Queue
import os

# Broker
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True

# Result backend
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'django-db')
CELERY_RESULT_EXPIRES = 60 * 60 * 24  # 24 hours

# Serialization
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']

# Time and timezone
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True

# Task execution
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60  # 30 min hard limit
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60  # 25 min soft limit
CELERY_TASK_ACKS_LATE = True
CELERY_TASK_REJECT_ON_WORKER_LOST = True

# Worker
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000

# Queues
CELERY_TASK_QUEUES = (
    Queue('default'),
    Queue('high-priority'),
    Queue('low-priority'),
)
CELERY_TASK_DEFAULT_QUEUE = 'default'

# Routing
CELERY_TASK_ROUTES = {
    'myapp.tasks.urgent_*': {'queue': 'high-priority'},
    'myapp.tasks.report_*': {'queue': 'low-priority'},
}

# Beat scheduler (for periodic tasks)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
```

```

### references/error-handling.md

```markdown
# Error Handling and Retry Strategies

## Basic Retry Configuration

### Simple Retry

```python
from celery import shared_task

@shared_task(bind=True, max_retries=3)
def task_with_retry(self, data):
    try:
        process(data)
    except TemporaryError as exc:
        # Retry with default delay (3 minutes)
        raise self.retry(exc=exc)
```

### Custom Retry Delay

```python
@shared_task(bind=True, max_retries=5)
def task_with_custom_retry(self, data):
    try:
        process(data)
    except TemporaryError as exc:
        # Retry after 60 seconds
        raise self.retry(exc=exc, countdown=60)
```

### Exponential Backoff

```python
@shared_task(bind=True, max_retries=5)
def task_with_backoff(self, data):
    try:
        process(data)
    except TemporaryError as exc:
        # Exponential backoff: 1min, 2min, 4min, 8min, 16min
        countdown = 60 * (2 ** self.request.retries)
        raise self.retry(exc=exc, countdown=countdown)
```

### Retry with Jitter

```python
import random

@shared_task(bind=True, max_retries=5)
def task_with_jitter(self, data):
    try:
        process(data)
    except TemporaryError as exc:
        # Add randomness to prevent thundering herd
        base_delay = 60 * (2 ** self.request.retries)
        jitter = random.uniform(0, base_delay * 0.1)
        raise self.retry(exc=exc, countdown=base_delay + jitter)
```

## Auto-Retry Decorator

### Using autoretry_for

```python
@shared_task(
    bind=True,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,
    retry_backoff_max=600,  # Max 10 minutes between retries
    retry_jitter=True,
    max_retries=5,
)
def auto_retry_task(self, data):
    # Will automatically retry on ConnectionError or TimeoutError
    response = call_external_api(data)
    return response
```

### Multiple Exception Types

```python
@shared_task(
    autoretry_for=(
        ConnectionError,
        TimeoutError,
        requests.exceptions.RequestException,
    ),
    retry_backoff=60,  # Start with 60 second delay
    retry_backoff_max=3600,  # Max 1 hour
    max_retries=10,
)
def robust_api_call(url):
    return requests.get(url, timeout=30).json()
```

## Exception Handling Patterns

### Handle Specific Exceptions Differently

```python
from celery.exceptions import MaxRetriesExceededError

@shared_task(bind=True, max_retries=3)
def task_with_exception_handling(self, order_id):
    try:
        order = Order.objects.get(id=order_id)
        order.process()

    except Order.DoesNotExist:
        # Permanent failure - don't retry
        logger.error(f"Order {order_id} not found")
        return None

    except PaymentGatewayError as exc:
        # Temporary - retry with backoff
        logger.warning(f"Payment gateway error: {exc}")
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

    except InsufficientFundsError:
        # Business logic failure - don't retry
        order.mark_payment_failed()
        return {'status': 'failed', 'reason': 'insufficient_funds'}

    except MaxRetriesExceededError:
        # All retries exhausted
        order.mark_requires_manual_review()
        notify_admin(f"Order {order_id} requires manual review")
        raise
```

### Cleanup on Failure

```python
@shared_task(bind=True, max_retries=3)
def task_with_cleanup(self, resource_id):
    resource = None
    try:
        resource = acquire_resource(resource_id)
        process(resource)
    except Exception as exc:
        if self.request.retries >= self.max_retries:
            # Final failure - cleanup
            if resource:
                cleanup_resource(resource)
            mark_as_failed(resource_id)
        raise self.retry(exc=exc, countdown=60)
    finally:
        if resource:
            release_resource(resource)
```

## Timeout Handling

### Soft Time Limit

```python
from celery.exceptions import SoftTimeLimitExceeded

@shared_task(
    bind=True,
    soft_time_limit=300,  # Raise exception after 5 min
    time_limit=360,       # Kill process after 6 min
)
def task_with_timeout(self, large_dataset):
    try:
        for item in large_dataset:
            process_item(item)
    except SoftTimeLimitExceeded:
        # Graceful cleanup
        logger.warning("Task timeout approaching, saving progress")
        save_progress()
        # Optionally retry with remaining items
        raise self.retry(countdown=60)
```

### Per-Execution Timeout

```python
@shared_task(bind=True)
def dynamic_timeout_task(self, data, timeout=300):
    # Apply timeout dynamically
    self.request.timelimit = (timeout, timeout + 60)
    process(data)
```

## Dead Letter Queue Pattern

### Manual Dead Letter Queue

```python
from kombu import Queue

CELERY_TASK_QUEUES = (
    Queue('default'),
    Queue('dead_letter'),
)

@shared_task(bind=True, max_retries=3)
def task_with_dlq(self, data):
    try:
        process(data)
    except Exception as exc:
        if self.request.retries >= self.max_retries:
            # Send to dead letter queue for manual processing
            handle_dead_letter.delay(
                task_name=self.name,
                args=self.request.args,
                kwargs=self.request.kwargs,
                exception=str(exc),
                traceback=traceback.format_exc(),
            )
            return
        raise self.retry(exc=exc, countdown=60)


@shared_task(queue='dead_letter')
def handle_dead_letter(task_name, args, kwargs, exception, traceback):
    # Store for manual review
    DeadLetterMessage.objects.create(
        task_name=task_name,
        args=args,
        kwargs=kwargs,
        exception=exception,
        traceback=traceback,
    )
    notify_admin(f"Dead letter: {task_name}")
```

### RabbitMQ Dead Letter Exchange

```python
from kombu import Exchange, Queue

# Configure dead letter exchange in RabbitMQ
dlx = Exchange('dlx', type='direct')
dead_letter_queue = Queue(
    'dead_letters',
    exchange=dlx,
    routing_key='dead_letter',
)

main_queue = Queue(
    'default',
    queue_arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'dead_letter',
    },
)

CELERY_TASK_QUEUES = (main_queue, dead_letter_queue)
```

## Error Callbacks

### Using on_failure

```python
from celery import Task

class TaskWithErrorCallback(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.error(
            f'Task {self.name}[{task_id}] failed: {exc}',
            exc_info=einfo.exc_info,
        )
        # Send notification
        send_alert(
            title=f"Task Failed: {self.name}",
            message=str(exc),
            task_id=task_id,
        )


@shared_task(base=TaskWithErrorCallback, bind=True)
def task_with_error_callback(self, data):
    process(data)
```

### Link Error Callback

```python
@shared_task
def error_handler(request, exc, traceback):
    logger.error(
        f'Task {request.id} raised exception: {exc}',
        extra={
            'task_id': request.id,
            'task_name': request.task,
            'args': request.args,
            'kwargs': request.kwargs,
        }
    )

# Attach error callback when calling
my_task.apply_async(
    args=[data],
    link_error=error_handler.s(),
)
```

## Idempotent Error Handling

### Safe Retry Pattern

```python
@shared_task(bind=True, max_retries=5)
def idempotent_payment_task(self, order_id, idempotency_key):
    # Check if already processed
    if PaymentRecord.objects.filter(idempotency_key=idempotency_key).exists():
        logger.info(f"Payment {idempotency_key} already processed")
        return {'status': 'already_processed'}

    try:
        result = process_payment(order_id)

        # Record successful payment
        PaymentRecord.objects.create(
            idempotency_key=idempotency_key,
            order_id=order_id,
            result=result,
        )
        return result

    except PaymentGatewayError as exc:
        # Safe to retry - payment won't be duplicated
        raise self.retry(exc=exc, countdown=60)
```

### Transactional Safety

```python
from django.db import transaction

@shared_task(bind=True, max_retries=3)
def transactional_task(self, order_id):
    try:
        with transaction.atomic():
            order = Order.objects.select_for_update().get(id=order_id)

            if order.status == 'processed':
                return {'status': 'already_processed'}

            # All-or-nothing processing
            process_order(order)
            update_inventory(order)
            order.status = 'processed'
            order.save()

    except Order.DoesNotExist:
        return None
    except Exception as exc:
        # Transaction rolled back, safe to retry
        raise self.retry(exc=exc, countdown=60)
```

## Monitoring Failed Tasks

### Logging Configuration

```python
# celery.py
import logging

logger = logging.getLogger('celery.task')

@signals.task_failure.connect
def log_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **kw):
    logger.error(
        f'Task {sender.name}[{task_id}] failed',
        exc_info=einfo.exc_info,
        extra={
            'task_name': sender.name,
            'task_id': task_id,
            'args': args,
            'kwargs': kwargs,
        }
    )

@signals.task_retry.connect
def log_task_retry(sender, request, reason, einfo, **kw):
    logger.warning(
        f'Task {sender.name}[{request.id}] retrying: {reason}',
        extra={
            'task_name': sender.name,
            'task_id': request.id,
            'retry_count': request.retries,
        }
    )
```

### Storing Failed Tasks

```python
# models.py
class FailedTask(models.Model):
    task_id = models.CharField(max_length=255, unique=True)
    task_name = models.CharField(max_length=255)
    args = models.JSONField()
    kwargs = models.JSONField()
    exception = models.TextField()
    traceback = models.TextField()
    created_at = models.DateTimeField(auto_now_add=True)
    retried = models.BooleanField(default=False)

# signals
@signals.task_failure.connect
def store_failed_task(sender, task_id, exception, args, kwargs, traceback, einfo, **kw):
    FailedTask.objects.update_or_create(
        task_id=task_id,
        defaults={
            'task_name': sender.name,
            'args': list(args),
            'kwargs': dict(kwargs),
            'exception': str(exception),
            'traceback': einfo.traceback if einfo else '',
        }
    )
```

## Retry Best Practices Summary

| Scenario | Retry Strategy |
|----------|---------------|
| Network errors | Exponential backoff with jitter, 5+ retries |
| Rate limiting | Respect Retry-After header, or fixed backoff |
| Database locks | Short delay (1-5s), few retries |
| External API down | Long backoff (minutes), many retries |
| Invalid data | Don't retry, log and alert |
| Resource not found | Don't retry (usually permanent) |
| Authentication errors | Don't retry, investigate |
| Timeout | Retry with same/increased timeout |

### Configuration Recommendations

```python
# Production retry configuration
@shared_task(
    bind=True,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,        # Exponential backoff
    retry_backoff_max=600,     # Max 10 min between retries
    retry_jitter=True,         # Add randomness
    max_retries=5,             # Limit total attempts
    acks_late=True,            # Ack after completion
    reject_on_worker_lost=True # Requeue if worker dies
)
def production_task(self, data):
    pass
```

```

### references/periodic-tasks.md

```markdown
# Periodic Tasks with Celery Beat

## Celery Beat Overview

Celery Beat is a scheduler that kicks off tasks at regular intervals. Tasks are executed by available workers.

### Architecture

```
┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│  Celery Beat    │────▶│  Message Broker │────▶│  Celery Worker  │
│  (scheduler)    │     │  (Redis/RMQ)    │     │  (executor)     │
└─────────────────┘     └─────────────────┘     └─────────────────┘
```

**Important:** Only run ONE Beat scheduler instance to avoid duplicate task execution.

## Basic Configuration

### Static Schedule in Settings

```python
# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    # Execute every 30 seconds
    'check-heartbeat': {
        'task': 'myapp.tasks.check_heartbeat',
        'schedule': 30.0,
    },

    # Execute every minute
    'process-queue': {
        'task': 'myapp.tasks.process_queue',
        'schedule': 60.0,
        'args': (),
    },

    # Execute at midnight every day
    'daily-cleanup': {
        'task': 'myapp.tasks.daily_cleanup',
        'schedule': crontab(hour=0, minute=0),
    },

    # Execute Monday at 7:30am
    'weekly-report': {
        'task': 'myapp.tasks.send_weekly_report',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
    },

    # Every hour on the hour
    'hourly-sync': {
        'task': 'myapp.tasks.sync_data',
        'schedule': crontab(minute=0),
    },
}
```

### Schedule Types

```python
from celery.schedules import crontab, solar
from datetime import timedelta

CELERY_BEAT_SCHEDULE = {
    # Timedelta - simple intervals
    'every-10-seconds': {
        'task': 'myapp.tasks.quick_check',
        'schedule': timedelta(seconds=10),
    },

    'every-5-minutes': {
        'task': 'myapp.tasks.periodic_check',
        'schedule': timedelta(minutes=5),
    },

    # Crontab - cron-like schedule
    'weekday-mornings': {
        'task': 'myapp.tasks.morning_task',
        'schedule': crontab(hour=8, minute=0, day_of_week='mon-fri'),
    },

    # First of every month
    'monthly-report': {
        'task': 'myapp.tasks.monthly_report',
        'schedule': crontab(day_of_month=1, hour=0, minute=0),
    },

    # Every quarter (1st of Jan, Apr, Jul, Oct)
    'quarterly-report': {
        'task': 'myapp.tasks.quarterly_report',
        'schedule': crontab(day_of_month=1, month_of_year='1,4,7,10', hour=0, minute=0),
    },

    # Solar schedule - based on sunrise/sunset
    'at-sunrise': {
        'task': 'myapp.tasks.sunrise_task',
        'schedule': solar('sunrise', -37.81753, 144.96715),  # Melbourne
    },
}
```

## Crontab Reference

```python
from celery.schedules import crontab

# Crontab arguments:
# minute, hour, day_of_week, day_of_month, month_of_year

# Every minute
crontab()

# Every hour at minute 0
crontab(minute=0)

# Every day at midnight
crontab(minute=0, hour=0)

# Every Monday
crontab(minute=0, hour=0, day_of_week=1)

# Every 15 minutes
crontab(minute='*/15')

# Every hour between 9am-5pm on weekdays
crontab(minute=0, hour='9-17', day_of_week='mon-fri')

# At 12:30 on the 15th of every month
crontab(minute=30, hour=12, day_of_month=15)

# Multiple specific values
crontab(minute=0, hour='0,12')  # Midnight and noon
crontab(day_of_week='mon,wed,fri')  # MWF
```

## Django Database Scheduler

### Setup

```bash
pip install django-celery-beat
```

```python
# settings.py
INSTALLED_APPS = [
    ...
    'django_celery_beat',
]

# Use database scheduler
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
```

```bash
python manage.py migrate django_celery_beat
```

### Managing Schedules via Admin

```python
# admin.py
from django_celery_beat.models import (
    PeriodicTask,
    IntervalSchedule,
    CrontabSchedule,
)

# These are auto-registered, but you can customize:
from django.contrib import admin
from django_celery_beat.admin import PeriodicTaskAdmin
from django_celery_beat.models import PeriodicTask

class CustomPeriodicTaskAdmin(PeriodicTaskAdmin):
    list_display = ['name', 'task', 'enabled', 'last_run_at']
    list_filter = ['enabled', 'task']

admin.site.unregister(PeriodicTask)
admin.site.register(PeriodicTask, CustomPeriodicTaskAdmin)
```

### Creating Schedules Programmatically

```python
from django_celery_beat.models import (
    PeriodicTask,
    IntervalSchedule,
    CrontabSchedule,
)
import json

# Create interval schedule (every 10 minutes)
schedule, created = IntervalSchedule.objects.get_or_create(
    every=10,
    period=IntervalSchedule.MINUTES,
)

# Create periodic task with interval
PeriodicTask.objects.create(
    interval=schedule,
    name='Sync Data Every 10 Minutes',
    task='myapp.tasks.sync_data',
    args=json.dumps([]),
    kwargs=json.dumps({}),
)

# Create crontab schedule (daily at 6am)
cron_schedule, _ = CrontabSchedule.objects.get_or_create(
    minute='0',
    hour='6',
    day_of_week='*',
    day_of_month='*',
    month_of_year='*',
)

# Create periodic task with crontab
PeriodicTask.objects.create(
    crontab=cron_schedule,
    name='Daily Report',
    task='myapp.tasks.generate_daily_report',
    args=json.dumps(['pdf']),
    kwargs=json.dumps({'include_charts': True}),
    enabled=True,
)
```

### Dynamic Schedule Management

```python
from django_celery_beat.models import PeriodicTask, IntervalSchedule
import json

def create_user_notification_schedule(user_id, interval_minutes):
    """Create a periodic task for user-specific notifications."""
    schedule, _ = IntervalSchedule.objects.get_or_create(
        every=interval_minutes,
        period=IntervalSchedule.MINUTES,
    )

    task_name = f'notify-user-{user_id}'

    PeriodicTask.objects.update_or_create(
        name=task_name,
        defaults={
            'interval': schedule,
            'task': 'notifications.tasks.send_user_digest',
            'kwargs': json.dumps({'user_id': user_id}),
            'enabled': True,
        }
    )

def disable_user_notifications(user_id):
    """Disable notifications for a user."""
    task_name = f'notify-user-{user_id}'
    PeriodicTask.objects.filter(name=task_name).update(enabled=False)

def delete_user_notifications(user_id):
    """Remove notification schedule for a user."""
    task_name = f'notify-user-{user_id}'
    PeriodicTask.objects.filter(name=task_name).delete()
```

## Timezone Handling

### Configuration

```python
# settings.py
CELERY_TIMEZONE = 'America/New_York'
CELERY_ENABLE_UTC = True  # Store in UTC, display in timezone

# Or use Django's timezone
CELERY_TIMEZONE = TIME_ZONE
```

### Timezone-Aware Crontabs

```python
from celery.schedules import crontab
import pytz

CELERY_BEAT_SCHEDULE = {
    # This runs at 9am in the configured CELERY_TIMEZONE
    'morning-email': {
        'task': 'myapp.tasks.send_morning_email',
        'schedule': crontab(hour=9, minute=0),
    },
}

# For database scheduler, set timezone on CrontabSchedule
from django_celery_beat.models import CrontabSchedule

schedule = CrontabSchedule.objects.create(
    minute='0',
    hour='9',
    timezone=pytz.timezone('America/New_York'),
)
```

## Task Arguments and Options

### Passing Arguments

```python
CELERY_BEAT_SCHEDULE = {
    'daily-report': {
        'task': 'myapp.tasks.generate_report',
        'schedule': crontab(hour=6, minute=0),
        'args': ('daily', 'pdf'),
        'kwargs': {'include_charts': True, 'recipients': ['[email protected]']},
    },
}
```

### Task Options

```python
CELERY_BEAT_SCHEDULE = {
    'priority-task': {
        'task': 'myapp.tasks.important_task',
        'schedule': crontab(minute='*/5'),
        'options': {
            'queue': 'high-priority',
            'priority': 9,
            'expires': 300,  # Expire if not started in 5 min
        },
    },
}
```

## Preventing Duplicate Executions

### Using Locks

```python
from django.core.cache import cache
from celery import shared_task
from contextlib import contextmanager

@contextmanager
def beat_lock(lock_id, timeout=60*60):
    """Prevent duplicate periodic task execution."""
    lock_acquired = cache.add(lock_id, 'locked', timeout)
    try:
        yield lock_acquired
    finally:
        if lock_acquired:
            cache.delete(lock_id)

@shared_task
def hourly_sync():
    with beat_lock('hourly-sync-lock') as acquired:
        if not acquired:
            return  # Another instance is running

        # Safe to execute
        perform_sync()
```

### Using celery-once

```bash
pip install celery-once
```

```python
from celery_once import QueueOnce

@shared_task(base=QueueOnce, once={'graceful': True})
def slow_task():
    # Only one instance runs at a time
    perform_slow_operation()
```

## Running Celery Beat

### Development

```bash
# Run beat with worker (for development only)
celery -A myproject worker --beat --loglevel=info

# Run beat separately
celery -A myproject beat --loglevel=info
```

### Production

```bash
# Run beat as separate process
celery -A myproject beat --loglevel=info --pidfile=/var/run/celery/beat.pid

# With database scheduler
celery -A myproject beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
```

## Best Practices

### Schedule Design

```python
# GOOD: Offset schedules to avoid thundering herd
CELERY_BEAT_SCHEDULE = {
    'task-a': {'task': 'app.task_a', 'schedule': crontab(minute=0)},  # :00
    'task-b': {'task': 'app.task_b', 'schedule': crontab(minute=5)},  # :05
    'task-c': {'task': 'app.task_c', 'schedule': crontab(minute=10)}, # :10
}

# BAD: All tasks at the same time
CELERY_BEAT_SCHEDULE = {
    'task-a': {'task': 'app.task_a', 'schedule': crontab(minute=0)},
    'task-b': {'task': 'app.task_b', 'schedule': crontab(minute=0)},
    'task-c': {'task': 'app.task_c', 'schedule': crontab(minute=0)},
}
```

### Handling Long-Running Tasks

```python
@shared_task(
    soft_time_limit=3600,  # 1 hour soft limit
    time_limit=3660,       # Hard limit slightly longer
)
def daily_report():
    """Generate daily report - may take up to 1 hour."""
    # Set schedule to run with enough gap
    pass

CELERY_BEAT_SCHEDULE = {
    'daily-report': {
        'task': 'myapp.tasks.daily_report',
        'schedule': crontab(hour=2, minute=0),  # 2am, plenty of time before business hours
        'options': {'expires': 3600 * 3},  # Expire if not started in 3 hours
    },
}
```

### Monitoring Beat Health

```python
@shared_task
def beat_health_check():
    """Record that beat is running."""
    cache.set('celery-beat-heartbeat', timezone.now(), timeout=120)

CELERY_BEAT_SCHEDULE = {
    'beat-health-check': {
        'task': 'myapp.tasks.beat_health_check',
        'schedule': 60.0,  # Every minute
    },
}

# Check beat health
def is_beat_healthy():
    last_heartbeat = cache.get('celery-beat-heartbeat')
    if not last_heartbeat:
        return False
    return (timezone.now() - last_heartbeat).seconds < 120
```

### Cleanup Old Task Results

```python
# Clean up task results periodically
@shared_task
def cleanup_task_results():
    """Remove old task results from database."""
    from django_celery_results.models import TaskResult

    cutoff = timezone.now() - timedelta(days=7)
    TaskResult.objects.filter(date_created__lt=cutoff).delete()

CELERY_BEAT_SCHEDULE = {
    'cleanup-results': {
        'task': 'myapp.tasks.cleanup_task_results',
        'schedule': crontab(hour=3, minute=0),  # Daily at 3am
    },
}
```

## Common Patterns

### Conditional Execution

```python
@shared_task
def conditional_task():
    """Only execute if conditions are met."""
    if not should_run_today():
        return

    perform_task()

def should_run_today():
    # Skip weekends
    if timezone.now().weekday() >= 5:
        return False
    # Skip holidays
    if Holiday.objects.filter(date=timezone.now().date()).exists():
        return False
    return True
```

### Chained Periodic Tasks

```python
from celery import chain

@shared_task
def start_daily_pipeline():
    """Kick off daily processing pipeline."""
    workflow = chain(
        fetch_data.s(),
        process_data.s(),
        generate_report.s(),
        send_notifications.s(),
    )
    workflow.delay()

CELERY_BEAT_SCHEDULE = {
    'daily-pipeline': {
        'task': 'myapp.tasks.start_daily_pipeline',
        'schedule': crontab(hour=6, minute=0),
    },
}
```

```

### references/monitoring-observability.md

```markdown
# Monitoring and Observability

## Flower - Real-Time Monitor

### Installation and Setup

```bash
pip install flower
```

```bash
# Basic usage
celery -A myproject flower

# With options
celery -A myproject flower \
    --port=5555 \
    --broker=redis://localhost:6379/0 \
    --basic_auth=admin:password

# Persistent storage
celery -A myproject flower --persistent=True --db=flower.db
```

### Flower Configuration

```python
# flower_config.py
broker_api = 'redis://localhost:6379/0'
port = 5555
basic_auth = ['admin:secure_password']
persistent = True
db = '/var/lib/flower/flower.db'
max_tasks = 10000
purge_offline_workers = 300  # Remove offline workers after 5 min
```

```bash
celery -A myproject flower --conf=flower_config.py
```

### Flower in Production

```python
# docker-compose.yml
services:
  flower:
    image: mher/flower
    command: celery flower --broker=redis://redis:6379/0
    ports:
      - "5555:5555"
    environment:
      - FLOWER_BASIC_AUTH=admin:password
    depends_on:
      - redis
      - worker
```

### Flower API

```python
import requests

# List workers
response = requests.get('http://localhost:5555/api/workers')
workers = response.json()

# Get task info
response = requests.get(f'http://localhost:5555/api/task/info/{task_id}')
task_info = response.json()

# Revoke task
requests.post(f'http://localhost:5555/api/task/revoke/{task_id}')

# Queue length
response = requests.get('http://localhost:5555/api/queues/length')
queue_lengths = response.json()
```

## Logging Best Practices

### Task Logging Configuration

```python
# settings.py
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'celery': {
            'format': '[%(asctime)s: %(levelname)s/%(processName)s] [%(task_name)s(%(task_id)s)] %(message)s',
        },
    },
    'handlers': {
        'celery': {
            'level': 'INFO',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/celery/tasks.log',
            'maxBytes': 1024 * 1024 * 100,  # 100MB
            'backupCount': 10,
            'formatter': 'celery',
        },
    },
    'loggers': {
        'celery': {
            'handlers': ['celery'],
            'level': 'INFO',
            'propagate': False,
        },
        'celery.task': {
            'handlers': ['celery'],
            'level': 'INFO',
            'propagate': False,
        },
    },
}
```

### Structured Logging

```python
import structlog
from celery import shared_task

logger = structlog.get_logger()

@shared_task(bind=True)
def process_order(self, order_id):
    log = logger.bind(
        task_id=self.request.id,
        task_name=self.name,
        order_id=order_id,
    )

    log.info('starting_order_processing')

    try:
        order = Order.objects.get(id=order_id)
        log = log.bind(user_id=order.user_id)

        result = process(order)
        log.info('order_processed', result=result)
        return result

    except Order.DoesNotExist:
        log.error('order_not_found')
        raise
    except Exception as e:
        log.exception('order_processing_failed', error=str(e))
        raise
```

### Celery Signals for Logging

```python
from celery import signals
import logging

logger = logging.getLogger('celery.task')

@signals.task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **kw):
    logger.info(f'Task starting: {task.name}[{task_id}]')

@signals.task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **kw):
    logger.info(f'Task completed: {task.name}[{task_id}] state={state}')

@signals.task_failure.connect
def task_failure_handler(task_id, exception, args, kwargs, traceback, einfo, **kw):
    logger.error(
        f'Task failed: {task_id}',
        exc_info=True,
        extra={'args': args, 'kwargs': kwargs}
    )

@signals.task_retry.connect
def task_retry_handler(request, reason, einfo, **kw):
    logger.warning(f'Task retrying: {request.id} reason={reason}')
```

## Prometheus Metrics

### Setup with celery-exporter

```bash
pip install celery-exporter
```

```bash
celery-exporter --broker-url=redis://localhost:6379/0 --port=9808
```

### Queue Monitoring

```python
from celery import current_app
from prometheus_client import Gauge

QUEUE_LENGTH = Gauge('celery_queue_length', 'Queue length', ['queue'])

def update_queue_metrics():
    """Update queue length metrics."""
    with current_app.connection() as conn:
        for queue in ['default', 'high-priority', 'low-priority']:
            try:
                length = conn.default_channel.queue_declare(
                    queue=queue, passive=True
                ).message_count
                QUEUE_LENGTH.labels(queue=queue).set(length)
            except Exception:
                pass

# Call periodically via Beat
@shared_task
def collect_queue_metrics():
    update_queue_metrics()
```

## Health Checks

### Worker Health Check

```python
from celery import current_app

def check_celery_health():
    """Check if Celery workers are healthy."""
    try:
        # Ping workers
        result = current_app.control.ping(timeout=5)
        if not result:
            return {'healthy': False, 'error': 'No workers responded'}

        # Check active workers
        active = current_app.control.inspect().active()
        if not active:
            return {'healthy': False, 'error': 'No active workers'}

        return {
            'healthy': True,
            'workers': len(active),
            'worker_names': list(active.keys()),
        }
    except Exception as e:
        return {'healthy': False, 'error': str(e)}
```

### Django Health Check Integration

```python
# healthchecks.py
from health_check.backends import BaseHealthCheckBackend
from health_check.exceptions import HealthCheckException
from celery import current_app

class CeleryHealthCheck(BaseHealthCheckBackend):
    critical_service = True

    def check_status(self):
        try:
            result = current_app.control.ping(timeout=3)
            if not result:
                raise HealthCheckException('No Celery workers available')
        except Exception as e:
            raise HealthCheckException(str(e))

    def identifier(self):
        return 'Celery Workers'
```

```python
# settings.py
INSTALLED_APPS = [
    ...
    'health_check',
    'health_check.contrib.celery',
]
```

### Beat Health Check

```python
from django.core.cache import cache
from django.utils import timezone

@shared_task
def beat_heartbeat():
    """Record Beat scheduler heartbeat."""
    cache.set('celery_beat_heartbeat', timezone.now().isoformat(), timeout=120)

def check_beat_health():
    """Check if Beat is running."""
    heartbeat = cache.get('celery_beat_heartbeat')
    if not heartbeat:
        return {'healthy': False, 'error': 'No heartbeat recorded'}

    last_beat = timezone.datetime.fromisoformat(heartbeat)
    age = (timezone.now() - last_beat).seconds

    if age > 120:
        return {'healthy': False, 'error': f'Heartbeat is {age}s old'}

    return {'healthy': True, 'last_heartbeat': heartbeat}
```

## Debugging Stuck Tasks

### Inspecting Workers

```python
from celery import current_app

def inspect_workers():
    """Get detailed worker information."""
    inspect = current_app.control.inspect()

    return {
        'active': inspect.active(),      # Currently executing tasks
        'reserved': inspect.reserved(),   # Prefetched tasks
        'scheduled': inspect.scheduled(), # ETA/countdown tasks
        'stats': inspect.stats(),         # Worker statistics
    }

def get_active_tasks():
    """List all currently running tasks."""
    inspect = current_app.control.inspect()
    active = inspect.active()

    tasks = []
    for worker, task_list in (active or {}).items():
        for task in task_list:
            tasks.append({
                'worker': worker,
                'task_id': task['id'],
                'task_name': task['name'],
                'args': task['args'],
                'started': task.get('time_start'),
            })
    return tasks
```

### Finding Long-Running Tasks

```python
import time

def find_long_running_tasks(threshold_seconds=300):
    """Find tasks running longer than threshold."""
    inspect = current_app.control.inspect()
    active = inspect.active() or {}

    long_running = []
    now = time.time()

    for worker, tasks in active.items():
        for task in tasks:
            start_time = task.get('time_start')
            if start_time:
                duration = now - start_time
                if duration > threshold_seconds:
                    long_running.append({
                        'worker': worker,
                        'task_id': task['id'],
                        'task_name': task['name'],
                        'duration_seconds': int(duration),
                    })

    return long_running
```

### Queue Inspection

```python
def inspect_queues():
    """Get queue information."""
    from kombu import Connection

    with Connection(current_app.conf.broker_url) as conn:
        queues = {}
        for queue_name in ['default', 'high-priority', 'low-priority']:
            try:
                queue = conn.SimpleQueue(queue_name)
                queues[queue_name] = {
                    'length': len(queue),
                }
                queue.close()
            except Exception as e:
                queues[queue_name] = {'error': str(e)}

    return queues
```

## Alerting

### Slack Alerts

```python
import requests
from celery import signals

SLACK_WEBHOOK_URL = 'https://hooks.slack.com/services/XXX/YYY/ZZZ'

def send_slack_alert(message, level='warning'):
    color = {'info': '#36a64f', 'warning': '#ff9800', 'error': '#f44336'}
    requests.post(SLACK_WEBHOOK_URL, json={
        'attachments': [{
            'color': color.get(level, '#808080'),
            'text': message,
        }]
    })

@signals.task_failure.connect
def alert_on_failure(sender, task_id, exception, **kwargs):
    message = f"Task `{sender.name}` failed: {exception}"
    send_slack_alert(message, level='error')

@signals.task_retry.connect
def alert_on_retry(sender, request, reason, **kwargs):
    if request.retries >= 3:  # Only alert after multiple retries
        message = f"Task `{sender.name}` retrying ({request.retries}): {reason}"
        send_slack_alert(message, level='warning')
```

### PagerDuty Integration

```python
import pypd
from celery import signals

pypd.api_key = 'your-api-key'

@signals.task_failure.connect
def pagerduty_alert(sender, task_id, exception, **kwargs):
    # Only page for critical tasks
    critical_tasks = ['process_payment', 'send_critical_notification']

    if sender.name in critical_tasks:
        pypd.Event.create(data={
            'service_key': 'your-service-key',
            'event_type': 'trigger',
            'description': f'Critical task failed: {sender.name}',
            'details': {
                'task_id': task_id,
                'exception': str(exception),
            }
        })
```

## Dashboard Queries

### Key Metrics to Monitor

| Metric | Alert Threshold | Description |
|--------|-----------------|-------------|
| Queue length | > 1000 | Tasks piling up |
| Worker count | < expected | Workers died |
| Task failure rate | > 5% | High failure rate |
| Task duration p95 | > SLA | Tasks taking too long |
| Retry rate | > 10% | Transient issues |
| Beat heartbeat age | > 2 min | Beat stopped |

```

### references/production-deployment.md

```markdown
# Production Deployment

## Worker Deployment Patterns

### Basic Worker Command

```bash
celery -A myproject worker --loglevel=info
```

### Worker Options

```bash
celery -A myproject worker \
    --loglevel=info \
    --concurrency=4 \
    --hostname=worker1@%h \
    --queues=default,high-priority \
    --prefetch-multiplier=1 \
    --max-tasks-per-child=1000 \
    --time-limit=300 \
    --soft-time-limit=240
```

| Option | Description |
|--------|-------------|
| `-c, --concurrency` | Number of worker processes |
| `-n, --hostname` | Worker hostname (use %h for machine hostname) |
| `-Q, --queues` | Queues to consume from |
| `--prefetch-multiplier` | Tasks to prefetch (1 for fair, higher for throughput) |
| `--max-tasks-per-child` | Restart worker after N tasks (prevents memory leaks) |
| `--time-limit` | Hard time limit per task |
| `--soft-time-limit` | Soft limit (raises exception) |

## Process Supervision

### Systemd Configuration

```ini
# /etc/systemd/system/celery.service
[Unit]
Description=Celery Service
After=network.target

[Service]
Type=forking
User=celery
Group=celery
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/opt/myproject
ExecStart=/bin/sh -c '${CELERY_BIN} -A ${CELERY_APP} multi start ${CELERYD_NODES} \
    --pidfile=${CELERYD_PID_FILE} \
    --logfile=${CELERYD_LOG_FILE} \
    --loglevel=${CELERYD_LOG_LEVEL} \
    ${CELERYD_OPTS}'
ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait ${CELERYD_NODES} \
    --pidfile=${CELERYD_PID_FILE}'
ExecReload=/bin/sh -c '${CELERY_BIN} -A ${CELERY_APP} multi restart ${CELERYD_NODES} \
    --pidfile=${CELERYD_PID_FILE} \
    --logfile=${CELERYD_LOG_FILE} \
    --loglevel=${CELERYD_LOG_LEVEL} \
    ${CELERYD_OPTS}'
Restart=always

[Install]
WantedBy=multi-user.target
```

```bash
# /etc/conf.d/celery
CELERY_BIN="/opt/myproject/venv/bin/celery"
CELERY_APP="myproject"
CELERYD_NODES="worker1 worker2"
CELERYD_OPTS="--time-limit=300 --concurrency=4"
CELERYD_PID_FILE="/var/run/celery/%n.pid"
CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
CELERYD_LOG_LEVEL="INFO"
```

### Systemd for Beat

```ini
# /etc/systemd/system/celerybeat.service
[Unit]
Description=Celery Beat Service
After=network.target

[Service]
Type=simple
User=celery
Group=celery
EnvironmentFile=/etc/conf.d/celery
WorkingDirectory=/opt/myproject
ExecStart=/bin/sh -c '${CELERY_BIN} -A ${CELERY_APP} beat \
    --pidfile=${CELERYBEAT_PID_FILE} \
    --logfile=${CELERYBEAT_LOG_FILE} \
    --loglevel=${CELERYD_LOG_LEVEL} \
    --scheduler django_celery_beat.schedulers:DatabaseScheduler'
Restart=always

[Install]
WantedBy=multi-user.target
```

### Supervisor Configuration

```ini
# /etc/supervisor/conf.d/celery.conf
[program:celery-worker]
command=/opt/myproject/venv/bin/celery -A myproject worker --loglevel=INFO --concurrency=4
directory=/opt/myproject
user=celery
numprocs=1
stdout_logfile=/var/log/celery/worker.log
stderr_logfile=/var/log/celery/worker.log
autostart=true
autorestart=true
startsecs=10
stopwaitsecs=600
stopasgroup=true
priority=1000

[program:celery-beat]
command=/opt/myproject/venv/bin/celery -A myproject beat --loglevel=INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
directory=/opt/myproject
user=celery
numprocs=1
stdout_logfile=/var/log/celery/beat.log
stderr_logfile=/var/log/celery/beat.log
autostart=true
autorestart=true
startsecs=10
priority=999
```

## Containerized Deployment

### Dockerfile

```dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Create non-root user
RUN useradd -m celery
USER celery

# Default command (override in compose)
CMD ["celery", "-A", "myproject", "worker", "--loglevel=info"]
```

### Docker Compose

```yaml
# docker-compose.yml
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  web:
    build: .
    command: gunicorn myproject.wsgi:application --bind 0.0.0.0:8000
    volumes:
      - .:/app
    ports:
      - "8000:8000"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    depends_on:
      - redis

  celery-worker:
    build: .
    command: celery -A myproject worker --loglevel=info --concurrency=4
    volumes:
      - .:/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    depends_on:
      - redis
    deploy:
      replicas: 2
      resources:
        limits:
          memory: 512M

  celery-beat:
    build: .
    command: celery -A myproject beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    volumes:
      - .:/app
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on:
      - redis
    deploy:
      replicas: 1  # Only one beat instance!

  flower:
    build: .
    command: celery -A myproject flower --port=5555
    ports:
      - "5555:5555"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on:
      - redis

volumes:
  redis-data:
```

### Kubernetes Deployment

```yaml
# celery-worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: celery-worker
  template:
    metadata:
      labels:
        app: celery-worker
    spec:
      containers:
      - name: celery-worker
        image: myproject:latest
        command: ["celery", "-A", "myproject", "worker", "--loglevel=info"]
        env:
        - name: CELERY_BROKER_URL
          valueFrom:
            secretKeyRef:
              name: celery-secrets
              key: broker-url
        resources:
          limits:
            memory: "512Mi"
            cpu: "500m"
          requests:
            memory: "256Mi"
            cpu: "250m"
        livenessProbe:
          exec:
            command:
            - celery
            - -A
            - myproject
            - inspect
            - ping
          initialDelaySeconds: 30
          periodSeconds: 60
---
# celery-beat-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-beat
spec:
  replicas: 1  # Must be exactly 1
  strategy:
    type: Recreate  # Prevent duplicate beats during deploy
  selector:
    matchLabels:
      app: celery-beat
  template:
    metadata:
      labels:
        app: celery-beat
    spec:
      containers:
      - name: celery-beat
        image: myproject:latest
        command: ["celery", "-A", "myproject", "beat", "--loglevel=info"]
        env:
        - name: CELERY_BROKER_URL
          valueFrom:
            secretKeyRef:
              name: celery-secrets
              key: broker-url
```

## Scaling Strategies

### Horizontal Scaling

```yaml
# Scale workers based on queue length
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: celery-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: celery-worker
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: External
    external:
      metric:
        name: celery_queue_length
      target:
        type: AverageValue
        averageValue: 100  # Scale up when > 100 tasks per worker
```

### Queue-Based Scaling

```bash
# Dedicated workers for different queues
# High-priority: More workers, less concurrency (faster response)
celery -A myproject worker -Q high-priority -c 2 -n high@%h

# Default: Balanced
celery -A myproject worker -Q default -c 4 -n default@%h

# Low-priority: Fewer workers, more concurrency (batch processing)
celery -A myproject worker -Q low-priority -c 8 -n low@%h
```

## Deployment Warnings

### Task Signature Changes

**Critical:** Drain all queues before deploying changes to task signatures (adding, removing, or renaming parameters).

```python
# BEFORE: Task with two parameters
@shared_task
def process_order(order_id, notify=True):
    pass

# AFTER: Task with changed signature
@shared_task
def process_order(order_id, notify=True, priority='normal'):  # New param
    pass
```

**Problem:** Old messages in the queue have the old signature. When new workers pick them up, they may fail or behave unexpectedly.

**Solution:**

```bash
# 1. Stop producers (or pause task creation)
# 2. Wait for queues to drain
celery -A myproject inspect reserved
celery -A myproject inspect active

# 3. Verify queues are empty
celery -A myproject inspect stats  # Check queue lengths

# 4. Deploy new code
# 5. Restart workers
# 6. Resume producers
```

**Alternative: Backwards-Compatible Changes**

```python
# Make new parameters optional with defaults
@shared_task
def process_order(order_id, notify=True, priority=None):
    if priority is None:
        priority = 'normal'  # Handle old messages
    pass
```

### Avoid Long ETAs and Countdowns

Tasks with long `eta` or `countdown` values can cause issues:

```python
# BAD: Task scheduled far in the future
my_task.apply_async(eta=datetime.now() + timedelta(days=30))
my_task.apply_async(countdown=60*60*24*30)  # 30 days

# Problems:
# - Task signature might change before execution
# - Broker restarts may lose the task
# - Visibility timeout issues with Redis/SQS
```

**Better alternatives:**
- Use Celery Beat for scheduled tasks
- Store future tasks in database with a periodic recovery task
- Use a dedicated scheduling service

## Soft Shutdown (Celery 5.5+)

Celery 5.5+ introduced soft shutdown for cleaner worker termination.

```python
# settings.py
CELERY_WORKER_ENABLE_SOFT_SHUTDOWN_ON_IDLE = True
CELERY_WORKER_SOFT_SHUTDOWN_TIMEOUT = 60  # Wait up to 60s for tasks to complete
```

```bash
# Gracefully shutdown with soft timeout
celery -A myproject control shutdown --soft-timeout=60
```

Benefits:
- Workers finish current tasks before shutting down
- No task loss during deployments
- Cleaner rolling updates in Kubernetes

## Graceful Shutdown

### Signal Handling

```python
# celery.py
from celery.signals import worker_shutting_down

@worker_shutting_down.connect
def worker_shutting_down_handler(sig, how, exitcode, **kwargs):
    logger.info(f'Worker shutting down: signal={sig}, how={how}')
    # Perform cleanup
    cleanup_connections()
```

### Kubernetes Graceful Shutdown

```yaml
spec:
  containers:
  - name: celery-worker
    lifecycle:
      preStop:
        exec:
          command: ["/bin/sh", "-c", "celery -A myproject control shutdown"]
    terminationGracePeriodSeconds: 300  # 5 minutes for tasks to complete
```

### Docker Compose Graceful Shutdown

```yaml
services:
  celery-worker:
    stop_grace_period: 5m  # Wait up to 5 minutes
    stop_signal: SIGTERM   # Send SIGTERM for graceful shutdown
```

## Health Checks

### Liveness Probe

```python
# health.py
from celery import current_app

def celery_liveness():
    """Check if worker can receive tasks."""
    try:
        result = current_app.control.ping(timeout=5)
        return bool(result)
    except Exception:
        return False
```

```yaml
# Kubernetes liveness probe
livenessProbe:
  exec:
    command:
    - python
    - -c
    - "from myproject.health import celery_liveness; exit(0 if celery_liveness() else 1)"
  initialDelaySeconds: 30
  periodSeconds: 30
  timeoutSeconds: 10
```

### Readiness Probe

```python
def celery_readiness():
    """Check if worker is ready to accept tasks."""
    try:
        inspect = current_app.control.inspect()
        stats = inspect.stats()
        return bool(stats)
    except Exception:
        return False
```

## Production Settings Checklist

```python
# settings/production.py

# Broker
CELERY_BROKER_URL = os.environ['CELERY_BROKER_URL']
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_BROKER_CONNECTION_MAX_RETRIES = 10

# Result backend
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'django-db')
CELERY_RESULT_EXPIRES = 60 * 60 * 24  # 24 hours

# Serialization (JSON only for security)
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']

# Task execution
CELERY_TASK_ACKS_LATE = True
CELERY_TASK_REJECT_ON_WORKER_LOST = True
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60  # 30 min
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60

# Worker
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
CELERY_WORKER_DISABLE_RATE_LIMITS = False

# Beat (single instance only)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

# Monitoring
CELERY_SEND_EVENTS = True
CELERY_SEND_TASK_SENT_EVENT = True
```

## Security Considerations

### Network Security

```yaml
# docker-compose - internal network
services:
  redis:
    networks:
      - internal
    # No ports exposed to host

  celery-worker:
    networks:
      - internal
    # Access redis via internal network

networks:
  internal:
    driver: bridge
```

### Secrets Management

```python
# Never hardcode credentials
# Use environment variables or secrets manager
import os

CELERY_BROKER_URL = os.environ['CELERY_BROKER_URL']

# Or use AWS Secrets Manager, HashiCorp Vault, etc.
from myproject.secrets import get_secret
CELERY_BROKER_URL = get_secret('celery/broker-url')
```

### Task Argument Validation

```python
@shared_task(bind=True)
def secure_task(self, user_id, action):
    # Validate inputs
    if not isinstance(user_id, int):
        raise ValueError("Invalid user_id type")
    if action not in ['activate', 'deactivate']:
        raise ValueError("Invalid action")

    # Process safely
    process_action(user_id, action)
```

## Deployment Checklist

- [ ] Use process supervisor (systemd, supervisor, K8s)
- [ ] Only one Beat instance running
- [ ] Graceful shutdown configured
- [ ] Health checks in place
- [ ] Logging to persistent storage
- [ ] Monitoring and alerting configured
- [ ] Secrets properly managed
- [ ] Resource limits set
- [ ] Backup strategy for broker/backend
- [ ] Auto-scaling configured (if needed)
- [ ] Security groups/firewall rules configured
- [ ] SSL/TLS for broker connections (production)

```