authoring-dags
Workflow and best practices for writing Apache Airflow DAGs. Use when the user wants to create a new DAG, write pipeline code, or asks about DAG patterns and conventions. For testing and debugging DAGs, see the testing-dags skill.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install astronomer-agents-authoring-dags
Repository
Skill path: skills/authoring-dags
Workflow and best practices for writing Apache Airflow DAGs. Use when the user wants to create a new DAG, write pipeline code, or asks about DAG patterns and conventions. For testing and debugging DAGs, see the testing-dags skill.
Open repositoryBest for
Primary workflow: Write Technical Docs.
Technical facets: Full Stack, Tech Writer, Testing.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: astronomer.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install authoring-dags into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/astronomer/agents before adding authoring-dags to shared team environments
- Use authoring-dags for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: authoring-dags
description: Workflow and best practices for writing Apache Airflow DAGs. Use when the user wants to create a new DAG, write pipeline code, or asks about DAG patterns and conventions. For testing and debugging DAGs, see the testing-dags skill.
hooks:
Stop:
- hooks:
- type: command
command: "echo 'Remember to test your DAG with the testing-dags skill'"
---
# DAG Authoring Skill
This skill guides you through creating and validating Airflow DAGs using best practices and MCP tools.
> **For testing and debugging DAGs**, see the **testing-dags** skill which covers the full test → debug → fix → retest workflow.
---
## ⚠️ CRITICAL WARNING: Use MCP Tools, NOT CLI Commands ⚠️
> **STOP! Before running ANY Airflow-related command, read this.**
>
> You MUST use MCP tools for ALL Airflow interactions. CLI commands like `astro dev run`, `airflow dags`, or shell commands to read logs are **FORBIDDEN**.
>
> **Why?** MCP tools provide structured, reliable output. CLI commands are fragile, produce unstructured text, and often fail silently.
---
## CLI vs MCP Quick Reference
**ALWAYS use Airflow MCP tools. NEVER use CLI commands.**
| ❌ DO NOT USE | ✅ USE INSTEAD |
|---------------|----------------|
| `astro dev run dags list` | `list_dags` MCP tool |
| `airflow dags list` | `list_dags` MCP tool |
| `astro dev run dags test` | `trigger_dag_and_wait` MCP tool |
| `airflow tasks test` | `trigger_dag_and_wait` MCP tool |
| `cat` / `grep` on Airflow logs | `get_task_logs` MCP tool |
| `find` in dags folder | `list_dags` or `explore_dag` MCP tool |
| Any `astro dev run ...` | Equivalent MCP tool |
| Any `airflow ...` CLI | Equivalent MCP tool |
| `ls` on `/usr/local/airflow/dags/` | `list_dags` or `explore_dag` MCP tool |
| `cat ... \| jq` to filter MCP results | Read the JSON directly from MCP response |
**Remember:**
- ✅ Airflow is ALREADY running — the MCP server handles the connection
- ❌ Do NOT attempt to start, stop, or manage the Airflow environment
- ❌ Do NOT use shell commands to check DAG status, logs, or errors
- ❌ Do NOT use bash to parse or filter MCP tool results — read the JSON directly
- ❌ Do NOT use `ls`, `find`, or `cat` on Airflow container paths (`/usr/local/airflow/...`)
- ✅ ALWAYS use MCP tools — they return structured JSON you can read directly
## Workflow Overview
```
┌─────────────────────────────────────┐
│ 1. DISCOVER │
│ Understand codebase & environment│
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 2. PLAN │
│ Propose structure, get approval │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 3. IMPLEMENT │
│ Write DAG following patterns │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 4. VALIDATE │
│ Check import errors, warnings │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 5. TEST (with user consent) │
│ Trigger, monitor, check logs │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 6. ITERATE │
│ Fix issues, re-validate │
└─────────────────────────────────────┘
```
---
## Phase 1: Discover
Before writing code, understand the context.
### Explore the Codebase
Use file tools to find existing patterns:
- `Glob` for `**/dags/**/*.py` to find existing DAGs
- `Read` similar DAGs to understand conventions
- Check `requirements.txt` for available packages
### Query the Airflow Environment
Use MCP tools to understand what's available:
| Tool | Purpose |
|------|---------|
| `list_connections` | What external systems are configured |
| `list_variables` | What configuration values exist |
| `list_providers` | What operator packages are installed |
| `get_airflow_version` | Version constraints and features |
| `list_dags` | Existing DAGs and naming conventions |
| `list_pools` | Resource pools for concurrency |
**Example discovery questions:**
- "Is there a Snowflake connection?" → `list_connections`
- "What Airflow version?" → `get_airflow_version`
- "Are S3 operators available?" → `list_providers`
---
## Phase 2: Plan
Based on discovery, propose:
1. **DAG structure** - Tasks, dependencies, schedule
2. **Operators to use** - Based on available providers
3. **Connections needed** - Existing or to be created
4. **Variables needed** - Existing or to be created
5. **Packages needed** - Additions to requirements.txt
**Get user approval before implementing.**
---
## Phase 3: Implement
Write the DAG following best practices (see below). Key steps:
1. Create DAG file in appropriate location
2. Update `requirements.txt` if needed
3. Save the file
---
## Phase 4: Validate
**Use the Airflow MCP as a feedback loop. Do NOT use CLI commands.**
### Step 1: Check Import Errors
After saving, call the MCP tool (Airflow will have already parsed the file):
**MCP tool:** `list_import_errors`
- If your file appears → **fix and retry**
- If no errors → **continue**
Common causes: missing imports, syntax errors, missing packages.
### Step 2: Verify DAG Exists
**MCP tool:** `get_dag_details(dag_id="your_dag_id")`
Check: DAG exists, schedule correct, tags set, paused status.
### Step 3: Check Warnings
**MCP tool:** `list_dag_warnings`
Look for deprecation warnings or configuration issues.
### Step 4: Explore DAG Structure
**MCP tool:** `explore_dag(dag_id="your_dag_id")`
Returns in one call: metadata, tasks, dependencies, source code.
---
## Phase 5: Test
> **📘 See the testing-dags skill for comprehensive testing guidance.**
Once validation passes, test the DAG using the workflow in the **testing-dags** skill:
1. **Get user consent** — Always ask before triggering
2. **Trigger and wait** — Use `trigger_dag_and_wait(dag_id, timeout=300)`
3. **Analyze results** — Check success/failure status
4. **Debug if needed** — Use `diagnose_dag_run` and `get_task_logs`
### Quick Test (Minimal)
```
# Ask user first, then:
trigger_dag_and_wait(dag_id="your_dag_id", timeout=300)
```
For the full test → debug → fix → retest loop, see **testing-dags**.
---
## Phase 6: Iterate
If issues found:
1. Fix the code
2. Check for import errors with `list_import_errors` MCP tool
3. Re-validate using MCP tools (Phase 4)
4. Re-test using the **testing-dags** skill workflow (Phase 5)
**Never use CLI commands to check status or logs. Always use MCP tools.**
---
## MCP Tools Quick Reference
| Phase | Tool | Purpose |
|-------|------|---------|
| Discover | `list_connections` | Available connections |
| Discover | `list_variables` | Configuration values |
| Discover | `list_providers` | Installed operators |
| Discover | `get_airflow_version` | Version info |
| Validate | `list_import_errors` | Parse errors (check first!) |
| Validate | `get_dag_details` | Verify DAG config |
| Validate | `list_dag_warnings` | Configuration warnings |
| Validate | `explore_dag` | Full DAG inspection |
> **Testing tools** — See the **testing-dags** skill for `trigger_dag_and_wait`, `diagnose_dag_run`, `get_task_logs`, etc.
---
## Best Practices & Anti-Patterns
For detailed code examples and patterns, see **[reference/best-practices.md](reference/best-practices.md)**.
Key topics covered:
- TaskFlow API usage
- Credentials management (connections, variables)
- Provider operators
- Idempotency patterns
- Data intervals
- Task groups
- Setup/Teardown patterns
- Data quality checks
- Anti-patterns to avoid
---
## Related Skills
- **testing-dags**: For testing DAGs, debugging failures, and the test → fix → retest loop
- **debugging-dags**: For troubleshooting failed DAGs
- **migrating-airflow-2-to-3**: For migrating DAGs to Airflow 3
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### reference/best-practices.md
```markdown
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)*
- [DAG Authoring Best Practices](#dag-authoring-best-practices)
- [Import Compatibility](#import-compatibility)
- [Table of Contents](#table-of-contents)
- [Use TaskFlow API](#use-taskflow-api)
- [Never Hard-Code Credentials](#never-hard-code-credentials)
- [Use Provider Operators](#use-provider-operators)
- [Ensure Idempotency](#ensure-idempotency)
- [Use Data Intervals](#use-data-intervals)
- [Organize with Task Groups](#organize-with-task-groups)
- [Use Setup/Teardown](#use-setupteardown)
- [Include Data Quality Checks](#include-data-quality-checks)
- [Anti-Patterns](#anti-patterns)
- [DON'T: Access Metadata DB Directly](#dont-access-metadata-db-directly)
- [DON'T: Use Deprecated Imports](#dont-use-deprecated-imports)
- [DON'T: Use SubDAGs](#dont-use-subdags)
- [DON'T: Use Deprecated Context Keys](#dont-use-deprecated-context-keys)
- [DON'T: Hard-Code File Paths](#dont-hard-code-file-paths)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
# DAG Authoring Best Practices
## Import Compatibility
**Airflow 2.x:**
```python
from airflow.decorators import dag, task, task_group, setup, teardown
from airflow.models import Variable
from airflow.hooks.base import BaseHook
```
**Airflow 3.x (Task SDK):**
```python
from airflow.sdk import dag, task, task_group, setup, teardown, Variable, Connection
```
The examples below use Airflow 2 imports for compatibility. On Airflow 3, these still work but are deprecated (AIR31x warnings). For new Airflow 3 projects, prefer `airflow.sdk` imports.
---
## Table of Contents
- [TaskFlow API](#use-taskflow-api)
- [Credentials Management](#never-hard-code-credentials)
- [Provider Operators](#use-provider-operators)
- [Idempotency](#ensure-idempotency)
- [Data Intervals](#use-data-intervals)
- [Task Groups](#organize-with-task-groups)
- [Setup/Teardown](#use-setupteardown)
- [Data Quality Checks](#include-data-quality-checks)
- [Anti-Patterns](#anti-patterns)
---
## Use TaskFlow API
```python
from airflow.decorators import dag, task # AF3: from airflow.sdk import dag, task
from datetime import datetime
@dag(
dag_id='my_pipeline',
start_date=datetime(2025, 1, 1),
schedule='@daily',
catchup=False,
default_args={'owner': 'data-team', 'retries': 2},
tags=['etl', 'production'],
)
def my_pipeline():
@task
def extract():
return {"data": [1, 2, 3]}
@task
def transform(data: dict):
return [x * 2 for x in data["data"]]
@task
def load(transformed: list):
print(f"Loaded {len(transformed)} records")
load(transform(extract()))
my_pipeline()
```
---
## Never Hard-Code Credentials
```python
# WRONG
conn_string = "postgresql://user:password@host:5432/db"
# CORRECT - Use connections
from airflow.hooks.base import BaseHook # AF3: from airflow.sdk import Connection
conn = BaseHook.get_connection("my_postgres_conn")
# CORRECT - Use variables
from airflow.models import Variable # AF3: from airflow.sdk import Variable
api_key = Variable.get("my_api_key")
# CORRECT - Templating
sql = "SELECT * FROM {{ var.value.table_name }}"
```
---
## Use Provider Operators
```python
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
```
---
## Ensure Idempotency
```python
@task
def load_data(data_interval_start, data_interval_end):
# Delete before insert
delete_existing(data_interval_start, data_interval_end)
insert_new(data_interval_start, data_interval_end)
```
---
## Use Data Intervals
```python
@task
def process(data_interval_start, data_interval_end):
print(f"Processing {data_interval_start} to {data_interval_end}")
# In SQL
sql = """
SELECT * FROM events
WHERE event_time >= '{{ data_interval_start }}'
AND event_time < '{{ data_interval_end }}'
"""
```
---
## Organize with Task Groups
```python
from airflow.decorators import task_group, task # AF3: from airflow.sdk import task_group, task
@task_group
def extract_sources():
@task
def from_postgres(): ...
@task
def from_api(): ...
return from_postgres(), from_api()
```
---
## Use Setup/Teardown
```python
from airflow.decorators import dag, task, setup, teardown # AF3: from airflow.sdk import ...
@setup
def create_temp_table(): ...
@teardown
def drop_temp_table(): ...
@task
def process(): ...
create = create_temp_table()
process_task = process()
cleanup = drop_temp_table()
create >> process_task >> cleanup
cleanup.as_teardown(setups=[create])
```
---
## Include Data Quality Checks
```python
from airflow.providers.common.sql.operators.sql import (
SQLColumnCheckOperator,
SQLTableCheckOperator,
)
SQLColumnCheckOperator(
task_id="check_columns",
table="my_table",
column_mapping={
"id": {"null_check": {"equal_to": 0}},
},
)
SQLTableCheckOperator(
task_id="check_table",
table="my_table",
checks={"row_count": {"check_statement": "COUNT(*) > 0"}},
)
```
---
## Anti-Patterns
### DON'T: Access Metadata DB Directly
```python
# WRONG - Fails in Airflow 3
from airflow.settings import Session
session.query(DagModel).all()
```
### DON'T: Use Deprecated Imports
```python
# WRONG
from airflow.operators.dummy_operator import DummyOperator
# CORRECT
from airflow.providers.standard.operators.empty import EmptyOperator
```
### DON'T: Use SubDAGs
```python
# WRONG
from airflow.operators.subdag import SubDagOperator
# CORRECT - Use task groups instead
from airflow.decorators import task_group # AF3: from airflow.sdk import task_group
```
### DON'T: Use Deprecated Context Keys
```python
# WRONG
execution_date = context["execution_date"]
# CORRECT
logical_date = context["dag_run"].logical_date
data_start = context["data_interval_start"]
```
### DON'T: Hard-Code File Paths
```python
# WRONG
open("include/data.csv")
# CORRECT - Files in dags/
import os
dag_dir = os.path.dirname(__file__)
open(os.path.join(dag_dir, "data.csv"))
# CORRECT - Files in include/
open(f"{os.getenv('AIRFLOW_HOME')}/include/data.csv")
```
```