Back to skills
SkillHub ClubWrite Technical DocsFull StackTech WriterTesting

authoring-dags

Workflow and best practices for writing Apache Airflow DAGs. Use when the user wants to create a new DAG, write pipeline code, or asks about DAG patterns and conventions. For testing and debugging DAGs, see the testing-dags skill.

Packaged view

This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.

Stars
282
Hot score
99
Updated
March 20, 2026
Overall rating
C3.8
Composite score
3.8
Best-practice grade
B81.2

Install command

npx @skill-hub/cli install astronomer-agents-authoring-dags

Repository

astronomer/agents

Skill path: skills/authoring-dags

Workflow and best practices for writing Apache Airflow DAGs. Use when the user wants to create a new DAG, write pipeline code, or asks about DAG patterns and conventions. For testing and debugging DAGs, see the testing-dags skill.

Open repository

Best for

Primary workflow: Write Technical Docs.

Technical facets: Full Stack, Tech Writer, Testing.

Target audience: everyone.

License: Unknown.

Original source

Catalog source: SkillHub Club.

Repository owner: astronomer.

This is still a mirrored public skill entry. Review the repository before installing into production workflows.

What it helps with

  • Install authoring-dags into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
  • Review https://github.com/astronomer/agents before adding authoring-dags to shared team environments
  • Use authoring-dags for development workflows

Works across

Claude CodeCodex CLIGemini CLIOpenCode

Favorites: 0.

Sub-skills: 0.

Aggregator: No.

Original source / Raw SKILL.md

---
name: authoring-dags
description: Workflow and best practices for writing Apache Airflow DAGs. Use when the user wants to create a new DAG, write pipeline code, or asks about DAG patterns and conventions. For testing and debugging DAGs, see the testing-dags skill.
hooks:
  Stop:
    - hooks:
        - type: command
          command: "echo 'Remember to test your DAG with the testing-dags skill'"
---

# DAG Authoring Skill

This skill guides you through creating and validating Airflow DAGs using best practices and MCP tools.

> **For testing and debugging DAGs**, see the **testing-dags** skill which covers the full test → debug → fix → retest workflow.

---

## ⚠️ CRITICAL WARNING: Use MCP Tools, NOT CLI Commands ⚠️

> **STOP! Before running ANY Airflow-related command, read this.**
>
> You MUST use MCP tools for ALL Airflow interactions. CLI commands like `astro dev run`, `airflow dags`, or shell commands to read logs are **FORBIDDEN**.
>
> **Why?** MCP tools provide structured, reliable output. CLI commands are fragile, produce unstructured text, and often fail silently.

---

## CLI vs MCP Quick Reference

**ALWAYS use Airflow MCP tools. NEVER use CLI commands.**

| ❌ DO NOT USE | ✅ USE INSTEAD |
|---------------|----------------|
| `astro dev run dags list` | `list_dags` MCP tool |
| `airflow dags list` | `list_dags` MCP tool |
| `astro dev run dags test` | `trigger_dag_and_wait` MCP tool |
| `airflow tasks test` | `trigger_dag_and_wait` MCP tool |
| `cat` / `grep` on Airflow logs | `get_task_logs` MCP tool |
| `find` in dags folder | `list_dags` or `explore_dag` MCP tool |
| Any `astro dev run ...` | Equivalent MCP tool |
| Any `airflow ...` CLI | Equivalent MCP tool |
| `ls` on `/usr/local/airflow/dags/` | `list_dags` or `explore_dag` MCP tool |
| `cat ... \| jq` to filter MCP results | Read the JSON directly from MCP response |

**Remember:**
- ✅ Airflow is ALREADY running — the MCP server handles the connection
- ❌ Do NOT attempt to start, stop, or manage the Airflow environment
- ❌ Do NOT use shell commands to check DAG status, logs, or errors
- ❌ Do NOT use bash to parse or filter MCP tool results — read the JSON directly
- ❌ Do NOT use `ls`, `find`, or `cat` on Airflow container paths (`/usr/local/airflow/...`)
- ✅ ALWAYS use MCP tools — they return structured JSON you can read directly

## Workflow Overview

```
┌─────────────────────────────────────┐
│ 1. DISCOVER                         │
│    Understand codebase & environment│
└─────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────┐
│ 2. PLAN                             │
│    Propose structure, get approval  │
└─────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────┐
│ 3. IMPLEMENT                        │
│    Write DAG following patterns     │
└─────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────┐
│ 4. VALIDATE                         │
│    Check import errors, warnings    │
└─────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────┐
│ 5. TEST (with user consent)         │
│    Trigger, monitor, check logs     │
└─────────────────────────────────────┘
                 ↓
┌─────────────────────────────────────┐
│ 6. ITERATE                          │
│    Fix issues, re-validate          │
└─────────────────────────────────────┘
```

---

## Phase 1: Discover

Before writing code, understand the context.

### Explore the Codebase

Use file tools to find existing patterns:
- `Glob` for `**/dags/**/*.py` to find existing DAGs
- `Read` similar DAGs to understand conventions
- Check `requirements.txt` for available packages

### Query the Airflow Environment

Use MCP tools to understand what's available:

| Tool | Purpose |
|------|---------|
| `list_connections` | What external systems are configured |
| `list_variables` | What configuration values exist |
| `list_providers` | What operator packages are installed |
| `get_airflow_version` | Version constraints and features |
| `list_dags` | Existing DAGs and naming conventions |
| `list_pools` | Resource pools for concurrency |

**Example discovery questions:**
- "Is there a Snowflake connection?" → `list_connections`
- "What Airflow version?" → `get_airflow_version`
- "Are S3 operators available?" → `list_providers`

---

## Phase 2: Plan

Based on discovery, propose:

1. **DAG structure** - Tasks, dependencies, schedule
2. **Operators to use** - Based on available providers
3. **Connections needed** - Existing or to be created
4. **Variables needed** - Existing or to be created
5. **Packages needed** - Additions to requirements.txt

**Get user approval before implementing.**

---

## Phase 3: Implement

Write the DAG following best practices (see below). Key steps:

1. Create DAG file in appropriate location
2. Update `requirements.txt` if needed
3. Save the file

---

## Phase 4: Validate

**Use the Airflow MCP as a feedback loop. Do NOT use CLI commands.**

### Step 1: Check Import Errors

After saving, call the MCP tool (Airflow will have already parsed the file):

**MCP tool:** `list_import_errors`

- If your file appears → **fix and retry**
- If no errors → **continue**

Common causes: missing imports, syntax errors, missing packages.

### Step 2: Verify DAG Exists

**MCP tool:** `get_dag_details(dag_id="your_dag_id")`

Check: DAG exists, schedule correct, tags set, paused status.

### Step 3: Check Warnings

**MCP tool:** `list_dag_warnings`

Look for deprecation warnings or configuration issues.

### Step 4: Explore DAG Structure

**MCP tool:** `explore_dag(dag_id="your_dag_id")`

Returns in one call: metadata, tasks, dependencies, source code.

---

## Phase 5: Test

> **📘 See the testing-dags skill for comprehensive testing guidance.**

Once validation passes, test the DAG using the workflow in the **testing-dags** skill:

1. **Get user consent** — Always ask before triggering
2. **Trigger and wait** — Use `trigger_dag_and_wait(dag_id, timeout=300)`
3. **Analyze results** — Check success/failure status
4. **Debug if needed** — Use `diagnose_dag_run` and `get_task_logs`

### Quick Test (Minimal)

```
# Ask user first, then:
trigger_dag_and_wait(dag_id="your_dag_id", timeout=300)
```

For the full test → debug → fix → retest loop, see **testing-dags**.

---

## Phase 6: Iterate

If issues found:
1. Fix the code
2. Check for import errors with `list_import_errors` MCP tool
3. Re-validate using MCP tools (Phase 4)
4. Re-test using the **testing-dags** skill workflow (Phase 5)

**Never use CLI commands to check status or logs. Always use MCP tools.**

---

## MCP Tools Quick Reference

| Phase | Tool | Purpose |
|-------|------|---------|
| Discover | `list_connections` | Available connections |
| Discover | `list_variables` | Configuration values |
| Discover | `list_providers` | Installed operators |
| Discover | `get_airflow_version` | Version info |
| Validate | `list_import_errors` | Parse errors (check first!) |
| Validate | `get_dag_details` | Verify DAG config |
| Validate | `list_dag_warnings` | Configuration warnings |
| Validate | `explore_dag` | Full DAG inspection |

> **Testing tools** — See the **testing-dags** skill for `trigger_dag_and_wait`, `diagnose_dag_run`, `get_task_logs`, etc.

---

## Best Practices & Anti-Patterns

For detailed code examples and patterns, see **[reference/best-practices.md](reference/best-practices.md)**.

Key topics covered:
- TaskFlow API usage
- Credentials management (connections, variables)
- Provider operators
- Idempotency patterns
- Data intervals
- Task groups
- Setup/Teardown patterns
- Data quality checks
- Anti-patterns to avoid

---

## Related Skills

- **testing-dags**: For testing DAGs, debugging failures, and the test → fix → retest loop
- **debugging-dags**: For troubleshooting failed DAGs
- **migrating-airflow-2-to-3**: For migrating DAGs to Airflow 3


---

## Referenced Files

> The following files are referenced in this skill and included for context.

### reference/best-practices.md

```markdown
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents**  *generated with [DocToc](https://github.com/thlorenz/doctoc)*

- [DAG Authoring Best Practices](#dag-authoring-best-practices)
  - [Import Compatibility](#import-compatibility)
  - [Table of Contents](#table-of-contents)
  - [Use TaskFlow API](#use-taskflow-api)
  - [Never Hard-Code Credentials](#never-hard-code-credentials)
  - [Use Provider Operators](#use-provider-operators)
  - [Ensure Idempotency](#ensure-idempotency)
  - [Use Data Intervals](#use-data-intervals)
  - [Organize with Task Groups](#organize-with-task-groups)
  - [Use Setup/Teardown](#use-setupteardown)
  - [Include Data Quality Checks](#include-data-quality-checks)
  - [Anti-Patterns](#anti-patterns)
    - [DON'T: Access Metadata DB Directly](#dont-access-metadata-db-directly)
    - [DON'T: Use Deprecated Imports](#dont-use-deprecated-imports)
    - [DON'T: Use SubDAGs](#dont-use-subdags)
    - [DON'T: Use Deprecated Context Keys](#dont-use-deprecated-context-keys)
    - [DON'T: Hard-Code File Paths](#dont-hard-code-file-paths)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

# DAG Authoring Best Practices

## Import Compatibility

**Airflow 2.x:**
```python
from airflow.decorators import dag, task, task_group, setup, teardown
from airflow.models import Variable
from airflow.hooks.base import BaseHook
```

**Airflow 3.x (Task SDK):**
```python
from airflow.sdk import dag, task, task_group, setup, teardown, Variable, Connection
```

The examples below use Airflow 2 imports for compatibility. On Airflow 3, these still work but are deprecated (AIR31x warnings). For new Airflow 3 projects, prefer `airflow.sdk` imports.

---

## Table of Contents

- [TaskFlow API](#use-taskflow-api)
- [Credentials Management](#never-hard-code-credentials)
- [Provider Operators](#use-provider-operators)
- [Idempotency](#ensure-idempotency)
- [Data Intervals](#use-data-intervals)
- [Task Groups](#organize-with-task-groups)
- [Setup/Teardown](#use-setupteardown)
- [Data Quality Checks](#include-data-quality-checks)
- [Anti-Patterns](#anti-patterns)

---

## Use TaskFlow API

```python
from airflow.decorators import dag, task  # AF3: from airflow.sdk import dag, task
from datetime import datetime

@dag(
    dag_id='my_pipeline',
    start_date=datetime(2025, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args={'owner': 'data-team', 'retries': 2},
    tags=['etl', 'production'],
)
def my_pipeline():
    @task
    def extract():
        return {"data": [1, 2, 3]}

    @task
    def transform(data: dict):
        return [x * 2 for x in data["data"]]

    @task
    def load(transformed: list):
        print(f"Loaded {len(transformed)} records")

    load(transform(extract()))

my_pipeline()
```

---

## Never Hard-Code Credentials

```python
# WRONG
conn_string = "postgresql://user:password@host:5432/db"

# CORRECT - Use connections
from airflow.hooks.base import BaseHook  # AF3: from airflow.sdk import Connection
conn = BaseHook.get_connection("my_postgres_conn")

# CORRECT - Use variables
from airflow.models import Variable  # AF3: from airflow.sdk import Variable
api_key = Variable.get("my_api_key")

# CORRECT - Templating
sql = "SELECT * FROM {{ var.value.table_name }}"
```

---

## Use Provider Operators

```python
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
```

---

## Ensure Idempotency

```python
@task
def load_data(data_interval_start, data_interval_end):
    # Delete before insert
    delete_existing(data_interval_start, data_interval_end)
    insert_new(data_interval_start, data_interval_end)
```

---

## Use Data Intervals

```python
@task
def process(data_interval_start, data_interval_end):
    print(f"Processing {data_interval_start} to {data_interval_end}")

# In SQL
sql = """
    SELECT * FROM events
    WHERE event_time >= '{{ data_interval_start }}'
      AND event_time < '{{ data_interval_end }}'
"""
```

---

## Organize with Task Groups

```python
from airflow.decorators import task_group, task  # AF3: from airflow.sdk import task_group, task

@task_group
def extract_sources():
    @task
    def from_postgres(): ...

    @task
    def from_api(): ...

    return from_postgres(), from_api()
```

---

## Use Setup/Teardown

```python
from airflow.decorators import dag, task, setup, teardown  # AF3: from airflow.sdk import ...

@setup
def create_temp_table(): ...

@teardown
def drop_temp_table(): ...

@task
def process(): ...

create = create_temp_table()
process_task = process()
cleanup = drop_temp_table()

create >> process_task >> cleanup
cleanup.as_teardown(setups=[create])
```

---

## Include Data Quality Checks

```python
from airflow.providers.common.sql.operators.sql import (
    SQLColumnCheckOperator,
    SQLTableCheckOperator,
)

SQLColumnCheckOperator(
    task_id="check_columns",
    table="my_table",
    column_mapping={
        "id": {"null_check": {"equal_to": 0}},
    },
)

SQLTableCheckOperator(
    task_id="check_table",
    table="my_table",
    checks={"row_count": {"check_statement": "COUNT(*) > 0"}},
)
```

---

## Anti-Patterns

### DON'T: Access Metadata DB Directly

```python
# WRONG - Fails in Airflow 3
from airflow.settings import Session
session.query(DagModel).all()
```

### DON'T: Use Deprecated Imports

```python
# WRONG
from airflow.operators.dummy_operator import DummyOperator

# CORRECT
from airflow.providers.standard.operators.empty import EmptyOperator
```

### DON'T: Use SubDAGs

```python
# WRONG
from airflow.operators.subdag import SubDagOperator

# CORRECT - Use task groups instead
from airflow.decorators import task_group  # AF3: from airflow.sdk import task_group
```

### DON'T: Use Deprecated Context Keys

```python
# WRONG
execution_date = context["execution_date"]

# CORRECT
logical_date = context["dag_run"].logical_date
data_start = context["data_interval_start"]
```

### DON'T: Hard-Code File Paths

```python
# WRONG
open("include/data.csv")

# CORRECT - Files in dags/
import os
dag_dir = os.path.dirname(__file__)
open(os.path.join(dag_dir, "data.csv"))

# CORRECT - Files in include/
open(f"{os.getenv('AIRFLOW_HOME')}/include/data.csv")
```

```

authoring-dags | SkillHub