dlt-skill
Creates and maintains dlt (data load tool) pipelines from APIs, databases, and other sources. Use when the user wants to build or debug pipelines; use verified sources (e.g. Salesforce, GitHub, Stripe) or declarative REST API or custom Python; configure destinations (e.g. DuckDB, BigQuery, Snowflake); implement incremental loading; or edit .dlt config and secrets. Use when the user mentions data ingestion, dlt pipeline, dlt init, rest_api_source, incremental load, or pipeline dashboard.
Packaged view
This page reorganizes the original catalog entry around fit, installability, and workflow context first. The original raw source lives below.
Install command
npx @skill-hub/cli install untitled-data-company-data-skills-dlt-skill
Repository
Skill path: skills/dlt-skill
Creates and maintains dlt (data load tool) pipelines from APIs, databases, and other sources. Use when the user wants to build or debug pipelines; use verified sources (e.g. Salesforce, GitHub, Stripe) or declarative REST API or custom Python; configure destinations (e.g. DuckDB, BigQuery, Snowflake); implement incremental loading; or edit .dlt config and secrets. Use when the user mentions data ingestion, dlt pipeline, dlt init, rest_api_source, incremental load, or pipeline dashboard.
Open repositoryBest for
Primary workflow: Analyze Data & AI.
Technical facets: Full Stack, Backend, Data / AI, Testing.
Target audience: everyone.
License: Unknown.
Original source
Catalog source: SkillHub Club.
Repository owner: untitled-data-company.
This is still a mirrored public skill entry. Review the repository before installing into production workflows.
What it helps with
- Install dlt-skill into Claude Code, Codex CLI, Gemini CLI, or OpenCode workflows
- Review https://github.com/untitled-data-company/data-skills before adding dlt-skill to shared team environments
- Use dlt-skill for development workflows
Works across
Favorites: 0.
Sub-skills: 0.
Aggregator: No.
Original source / Raw SKILL.md
---
name: dlt-skill
description: >
Creates and maintains dlt (data load tool) pipelines from APIs, databases, and other sources.
Use when the user wants to build or debug pipelines; use verified sources (e.g. Salesforce, GitHub, Stripe)
or declarative REST API or custom Python; configure destinations (e.g. DuckDB, BigQuery, Snowflake);
implement incremental loading; or edit .dlt config and secrets.
Use when the user mentions data ingestion, dlt pipeline, dlt init, rest_api_source, incremental load, or pipeline dashboard.
---
# dlt Pipeline Creator
Choose pipeline type with the decision tree below; then follow the Core Workflow.
**Quick start:** 1) Use the decision tree. 2) Follow the Core Workflow. 3) Use patterns and references as needed.
## Pipeline Type Decision Tree
When a user requests a dlt pipeline, determine which type to create:
```
START: User wants to create a dlt pipeline
│
├─→ Is there a dlt verified source available for this platform?
│ (Check: https://dlthub.com/docs/dlt-ecosystem/verified-sources)
│ │
│ YES → Use VERIFIED SOURCE approach
│ │ Examples: Salesforce, GitHub, Stripe, HubSpot, Slack
│ │ Action: Guide user through `dlt init <source> <destination>`
│ │
│ NO → Continue to next question
│
├─→ Is this a REST API with standard patterns?
│ (Standard auth, pagination, JSON responses)
│ │
│ YES → Use DECLARATIVE REST API approach
│ │ Examples: Pokemon API, simple REST APIs with clear endpoints
│ │ Action: Create config-based pipeline with rest_api_source
│ │
│ NO → Continue to next question
│
└─→ Does this require custom logic or Python packages?
│
YES → Use CUSTOM PYTHON approach
Examples: Python packages (simple-salesforce), complex transformations,
non-standard APIs, custom data sources
Action: Create custom source with @dlt.source and @dlt.resource decorators
```
## Core Workflow
### 1. Understand Requirements
Ask clarifying questions:
- **Source**: What is the data source? (API URL, platform name, database, etc.)
- **Source type**: Does this match a verified source, REST API, or require custom code?
- **Destination**: Where should data be loaded? (DuckDB, BigQuery, Snowflake, etc.)
- **Resources**: What specific data/endpoints are needed?
- **Incremental**: Should the pipeline load incrementally or do full refreshes?
- **Authentication**: What credentials are required?
### 2. Choose Pipeline Approach
Based on the decision tree above, select:
- **Verified source** - Pre-built, tested connector
- **Declarative REST API** - Config-based REST API pipeline
- **Custom Python** - Full control with Python code
### 3. Initialize or Create Pipeline
#### Verified source
```bash
dlt init <source_name> <destination_name>
```
Examples:
- `dlt init salesforce bigquery`
- `dlt init github duckdb`
- `dlt init stripe snowflake`
#### Declarative REST API or Custom Python
Use templates from this skill's [assets/templates/](assets/templates/) (copy into the project if needed):
- `declarative_rest_pipeline.py` - For REST APIs
- `custom_python_pipeline.py` - For custom sources
### 4. Install Required Packages
**Recommended:** Use the helper script (detects pip/uv/poetry):
```bash
python scripts/install_packages.py --destination <destination_name>
```
**Manual:** `pip install "dlt[<destination>,workspace]"` (e.g. `bigquery`, `snowflake`). For DuckDB use `dlt[workspace]` only. The `workspace` extra is required for `dlt pipeline <name> show` and the dashboard.
### 5. Configure Credentials
Create or update `.dlt/secrets.toml`:
**Structure:**
```toml
[sources.<source_name>]
# Source credentials here
[destination.<destination_name>]
# Destination credentials here
```
Use the template: [assets/templates/.dlt/secrets.toml](assets/templates/.dlt/secrets.toml)
**Important**: Remind user to add `.dlt/secrets.toml` to `.gitignore`!
**Note for DuckDB**: DuckDB doesn't require credentials in secrets.toml. Just specify the database file path in the pipeline or config.toml.
### 6. Configure Pipeline Settings
Create or update `.dlt/config.toml` for non-sensitive settings:
```toml
[sources.<source_name>]
base_url = "https://api.example.com"
timeout = 30
[destination.<destination_name>]
location = "US"
```
Use the template: [assets/templates/.dlt/config.toml](assets/templates/.dlt/config.toml)
### 7. Implement Pipeline Logic
Flesh out the pipeline code based on requirements:
**For verified sources**:
- Customize resource selection with `.with_resources()`
- Configure incremental loading with `.apply_hints()`
- See: [references/verified-sources.md](references/verified-sources.md)
**For Declarative REST API**:
- Define client configuration (base_url, auth)
- Configure resources and endpoints
- Set up pagination and incremental loading
- Resource-level options (e.g. `max_table_nesting`, `table_name`) are set in the resource dict in the config; see [references/rest-api-source.md](references/rest-api-source.md) Resource configuration.
- See: [references/rest-api-source.md](references/rest-api-source.md)
**For Custom Python**:
- Implement `@dlt.source` and `@dlt.resource` functions
- Use generators and yield patterns
- Configure write dispositions and primary keys
- See: [references/custom-sources.md](references/custom-sources.md)
### 8. Configure Incremental Loading (If Needed)
For pipelines that should load only new/changed data:
- Identify cursor field (timestamp, ID)
- Set write disposition to `merge`
- Define primary key for deduplication
- Configure incremental parameters
See: [references/incremental-loading.md](references/incremental-loading.md)
### 9. Test and Run Pipeline
```python
python <pipeline_file>.py
```
Check for errors and verify data is loaded correctly.
### 10. Inspect Results
**Prerequisite**: Ensure `dlt[workspace]` is installed (included by default when using `install_packages.py`).
Open the dlt dashboard to inspect loaded data:
```bash
dlt pipeline <pipeline_name> show
```
Or use the helper script:
```bash
python scripts/open_dashboard.py <pipeline_name>
```
## Pipeline Patterns
### Pattern 1: Verified source — Select specific resources
```python
from salesforce import salesforce_source
source = salesforce_source()
pipeline = dlt.pipeline(
pipeline_name='salesforce_pipeline',
destination='bigquery',
dataset_name='salesforce_data'
)
# Load only specific Salesforce objects
pipeline.run(source.with_resources("Account", "Opportunity", "Contact"))
```
### Pattern 2: Declarative REST API - Simple Endpoints
```python
from dlt.sources.rest_api import rest_api_source
config = {
"client": {
"base_url": "https://pokeapi.co/api/v2/",
},
"resources": [
"pokemon",
{
"name": "pokemon_details",
"endpoint": "pokemon/{name}",
"write_disposition": "merge",
"primary_key": "id"
}
]
}
pipeline = dlt.pipeline(
pipeline_name="pokemon",
destination="duckdb",
dataset_name="pokemon_data"
)
pipeline.run(rest_api_source(config))
```
### Pattern 3: Custom Python - Using Python Package
```python
import dlt
from simple_salesforce import Salesforce
@dlt.source
def salesforce_custom(username=dlt.secrets.value, password=dlt.secrets.value):
sf = Salesforce(username=username, password=password)
@dlt.resource(write_disposition='merge', primary_key='Id')
def accounts():
records = sf.query_all("SELECT Id, Name FROM Account")
yield records['records']
return accounts
pipeline = dlt.pipeline(
pipeline_name='salesforce_custom',
destination='duckdb',
dataset_name='salesforce'
)
pipeline.run(salesforce_custom())
```
### Pattern 4: Incremental Loading with REST API
```python
config = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
"auth": {"token": dlt.secrets["github_token"]}
},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
"params": {
"state": "all",
"since": "{incremental.start_value}"
}
},
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2024-01-01T00:00:00Z"
},
"write_disposition": "merge",
"primary_key": "id"
}
]
}
```
### Pattern 5: Non-endpoint resources for REST API sources (e.g. Database-Seeded or File-Seeded parameters)
Use non-endpoint resources (e.g. Database-Seeded or File-Seeded parameters) to drive REST API calls from a database, file, or other non-API source. Pre-fetch data **outside** the dlt pipeline context to avoid `dlt.attach()` / context conflicts. The seed resource must **yield a list** of dicts so each row drives one API request.
```python
import duckdb
import dlt
from dlt.sources.rest_api import rest_api_source
# 1. Pre-fetch data from database (outside dlt context)
def get_locations():
conn = duckdb.connect("locations.duckdb", read_only=True)
result = conn.execute("SELECT id, lat, lng FROM locations").fetchall()
conn.close()
return [{"id": r[0], "lat": r[1], "lng": r[2]} for r in result]
# 2. Create seed resource
@dlt.resource(selected=False)
def locations():
yield get_locations() # Yield as LIST
# 3. Configure REST API with resolve
config = {
"client": {"base_url": "https://api.weather.com/"},
"resources": [
locations(),
{
"name": "weather",
"endpoint": {
"path": "forecast",
"params": {
"lat": "{resources.locations.lat}",
"lng": "{resources.locations.lng}"
},
"data_selector": "$",
"paginator": "single_page"
},
"include_from_parent": ["id"],
"primary_key": "_locations_id"
}
]
}
source = rest_api_source(config)
pipeline = dlt.pipeline(
pipeline_name="weather",
destination="duckdb",
dataset_name="weather_data"
)
pipeline.run(source)
```
See: [references/rest-api-source.md](references/rest-api-source.md) (Non-REST Endpoint Resources, Query/Path Params, Single-Object Responses, include_from_parent).
## Best Practices (Data Engineering)
- **Secrets**: Use `.dlt/secrets.toml`; never hardcode; add to `.gitignore`
- **Primary keys**: Set for merge operations and deduplication
- **Write dispositions**: `append` (events), `merge` (stateful), `replace` (snapshots)
- **Performance**: Yield pages not rows; use incremental loading when possible
See [references/performance-tuning.md](references/performance-tuning.md), [references/incremental-loading.md](references/incremental-loading.md), and [references/troubleshooting.md](references/troubleshooting.md) for more.
## Common Challenges and Solutions
**Auth (OAuth2):** In REST config use `"auth": {"type": "oauth2_client_credentials", ...}`. For custom Python use `dlt.sources.helpers.rest_client.auth.OAuth2ClientCredentials` with `paginate()`. See [references/rest-api-source.md](references/rest-api-source.md).
**Custom pagination / nested data / performance:** See [references/rest-api-source.md](references/rest-api-source.md), [references/custom-sources.md](references/custom-sources.md), [references/performance-tuning.md](references/performance-tuning.md).
## Reference Documentation — When to Read What
- **Full workflow / step-by-step example** → [references/examples.md](references/examples.md)
- **Verified source** → [references/verified-sources.md](references/verified-sources.md)
- **Declarative REST API** → [references/rest-api-source.md](references/rest-api-source.md)
- **Custom Python source** → [references/custom-sources.md](references/custom-sources.md)
- **Incremental loading** → [references/incremental-loading.md](references/incremental-loading.md)
- **Performance** → [references/performance-tuning.md](references/performance-tuning.md)
- **Errors / debugging** → [references/troubleshooting.md](references/troubleshooting.md)
- **dlt basics** → [references/core-concepts.md](references/core-concepts.md)
## Templates and Scripts
### Templates (assets/templates/)
- **[custom_python_pipeline.py](assets/templates/custom_python_pipeline.py)** - Custom Python pipeline skeleton
- **[verified_source_pipeline.py](assets/templates/verified_source_pipeline.py)** - Verified source pipeline skeleton
- **[declarative_rest_pipeline.py](assets/templates/declarative_rest_pipeline.py)** - Declarative REST API pipeline skeleton
- **[.dlt/config.toml](assets/templates/.dlt/config.toml)** - Configuration file template
- **[.dlt/secrets.toml](assets/templates/.dlt/secrets.toml)** - Secrets file template
- **[.gitignore](assets/templates/.gitignore)** - Git ignore template for dlt projects
### Scripts (scripts/)
- **[install_packages.py](scripts/install_packages.py)** - Install dlt + destination extras (includes `workspace`). Run when setting up a new project or adding a destination.
- **[open_dashboard.py](scripts/open_dashboard.py)** - Open pipeline dashboard (`dlt pipeline <name> show`). Run after a pipeline run to inspect loaded data.
## Key Reminders
- **Always ask about destination** - Don't assume
- **Security first** - Never commit secrets; use `.dlt/secrets.toml` and provide `.gitignore`
- **Start simple** - Use verified sources when available; test incrementally
- **Read references** - Load detailed docs only when needed
---
## Referenced Files
> The following files are referenced in this skill and included for context.
### assets/templates/.dlt/config.toml
```toml
# dlt Configuration File
#
# This file contains non-sensitive configuration for your dlt pipeline.
# Add sensitive credentials to secrets.toml instead.
#
# Configuration hierarchy:
# [sources.<source_name>] - Source-specific configuration
# [destination.<destination_name>] - Destination-specific configuration
# [extract/normalize/load] - Pipeline stage configuration
# [runtime] - Runtime configuration
# Example: Source configuration
# [sources.my_source]
# base_url = "https://api.example.com/v1"
# timeout = 30
# batch_size = 100
# Example: Destination configuration
# [destination.bigquery]
# location = "US"
# project_id = "my-project"
# [destination.duckdb]
# credentials = "data/database.duckdb" # Just the file path, no authentication needed
# [destination.snowflake]
# database = "ANALYTICS"
# warehouse = "COMPUTE_WH"
# role = "LOADER"
# Example: Performance tuning
# [extract]
# workers = 10
# max_parallel_items = 50
# [normalize]
# workers = 4
# [load]
# workers = 20
# Example: Data writer configuration
# [data_writer]
# buffer_max_items = 5000
# file_max_items = 100000
# Example: Runtime configuration
# [runtime]
# log_level = "INFO"
# data_dir = "/path/to/data/directory"
```
### references/verified-sources.md
```markdown
# dlt Verified Sources
## Table of Contents
- [Overview](#overview)
- [Key Characteristics](#key-characteristics)
- [Difference from Core Sources](#difference-from-core-sources)
- [Available Verified Sources](#available-verified-sources)
- [Using Verified Sources](#using-verified-sources)
- [Customizing Verified Sources](#customizing-verified-sources)
- [Best Practices](#best-practices)
- [Common Patterns](#common-patterns)
- [Requesting New Verified Sources](#requesting-new-verified-sources)
- [Troubleshooting](#troubleshooting)
## Overview
Verified sources are pre-built, rigorously tested connectors for specific platforms and services. They're developed and maintained by the dlt team and community, providing production-ready pipelines with minimal setup.
## Key Characteristics
- **Pre-built**: Ready-to-use source code for specific platforms
- **Tested**: Rigorously tested against real APIs
- **Customizable**: Provided as Python code that can be modified
- **Maintained**: Regularly updated by dlt team and community
## Difference from Core Sources
- **Core sources** (REST API, SQL, cloud storage) are generic and built into dlt
- **Verified sources** are platform-specific and initialized separately
- Verified sources target individual platforms (Salesforce, GitHub, Stripe, etc.)
## Available Verified Sources
### CRM & Sales
- Salesforce
- HubSpot
- Pipedrive
- Zendesk Sell
### Analytics & Marketing
- Google Analytics
- Google Ads
- Facebook Ads
- Matomo
### Communication
- Slack
- Zendesk
- Intercom
### Development & Productivity
- GitHub
- GitLab
- Jira
- Asana
### Finance & Payments
- Stripe
- Shopify
### Databases
- MongoDB
- PostgreSQL
- MySQL
**Note**: See https://dlthub.com/docs/dlt-ecosystem/verified-sources for the complete list of 30+ verified sources.
## Using Verified Sources
### 1. Discover Available Sources
Browse the verified sources catalog at:
https://dlthub.com/docs/dlt-ecosystem/verified-sources
Or list available sources:
```bash
dlt init --list-sources
```
### 2. Initialize Source
```bash
dlt init <source_name> <destination_name>
```
**Examples:**
```bash
# Initialize Salesforce to BigQuery
dlt init salesforce bigquery
# Initialize GitHub to DuckDB
dlt init github duckdb
# Initialize Stripe to Snowflake
dlt init stripe snowflake
```
This command:
- Downloads verified source code to your working directory
- Creates pipeline Python file
- Generates `.dlt/` configuration directory
- Creates `requirements.txt` with dependencies
### 3. Configure Credentials
Add credentials to `.dlt/secrets.toml`:
```toml
[sources.salesforce]
username = "your-username"
password = "your-password"
security_token = "your-token"
[destination.bigquery]
credentials = "path/to/credentials.json"
project_id = "your-project"
```
### 4. Run Pipeline
```python
import dlt
from salesforce import salesforce_source
if __name__ == '__main__':
pipeline = dlt.pipeline(
pipeline_name='salesforce_pipeline',
destination='bigquery',
dataset_name='salesforce_data'
)
# Load all resources
load_info = pipeline.run(salesforce_source())
print(load_info)
```
### 5. Select Specific Resources
Most verified sources provide multiple resources. Select specific ones:
```python
# Load only specific Salesforce objects
source = salesforce_source()
pipeline.run(source.with_resources("Account", "Opportunity", "Contact"))
```
## Customizing Verified Sources
Verified sources are provided as Python code, allowing full customization:
### Modify Resources
```python
from salesforce import salesforce_source
# Get the source
source = salesforce_source()
# Modify or add resources
@dlt.resource(write_disposition='merge', primary_key='Id')
def custom_salesforce_query():
# Your custom SOQL query
pass
# Add custom resource to source
source.resources.update({"custom_query": custom_resource})
```
### Configure Resource Behavior
```python
# Change write disposition
source = salesforce_source()
source.resources["Account"].write_disposition = "replace"
# Set incremental loading
source.resources["Opportunity"].apply_hints(
incremental=dlt.sources.incremental("LastModifiedDate")
)
```
## Best Practices
1. **Start with verified sources** - When available, prefer verified over custom
2. **Review generated code** - Understand what data is being extracted
3. **Configure incremental loading** - Use incremental loading for large datasets
4. **Select necessary resources** - Don't load all resources if you only need some
5. **Check documentation** - Each verified source has specific documentation
6. **Keep sources updated** - Regularly update to get bug fixes and improvements
7. **Test in development** - Validate data before production deployment
8. **Monitor API limits** - Be aware of source platform rate limits
## Common Patterns
### Incremental Loading with Verified Sources
```python
from salesforce import salesforce_source
source = salesforce_source()
# Configure incremental loading on specific resources
source.resources["Opportunity"].apply_hints(
incremental=dlt.sources.incremental("LastModifiedDate"),
write_disposition="merge",
primary_key="Id"
)
pipeline.run(source)
```
### Filtering Data
```python
# Some verified sources support filtering parameters
source = salesforce_source(
objects=["Account", "Contact"], # Only load these objects
start_date="2024-01-01" # Only load data from this date
)
```
### Multiple Sources in One Pipeline
```python
from salesforce import salesforce_source
from hubspot import hubspot_source
pipeline = dlt.pipeline(
pipeline_name='crm_pipeline',
destination='bigquery',
dataset_name='crm_data'
)
# Load from multiple sources
pipeline.run(salesforce_source())
pipeline.run(hubspot_source())
```
## Requesting New Verified Sources
If you need a source that doesn't exist:
1. Check the GitHub repository for open requests
2. Submit a feature request: https://github.com/dlt-hub/dlt
3. Join the Slack community to discuss
4. Consider contributing your own verified source
## Troubleshooting
### Source Not Found
```bash
# Ensure dlt is up to date
pip install --upgrade dlt
# Check available sources
dlt init --list-sources
```
### Credential Issues
- Verify `.dlt/secrets.toml` has correct credentials
- Check credential format in source-specific documentation
- Ensure secret names match source function parameters
### Missing Dependencies
```bash
# Install all requirements
pip install -r requirements.txt
# Or install source-specific dependencies
pip install dlt[salesforce]
```
### API Rate Limits
- Use incremental loading to reduce API calls
- Configure appropriate delays in source code
- Consider running pipelines during off-peak hours
```
### references/rest-api-source.md
```markdown
# Declarative REST API Source
## Table of Contents
- [Overview](#overview)
- [Configuration Structure](#configuration-structure)
- [Client Configuration](#client-configuration)
- [Authentication Methods](#authentication-methods)
- [Resource Configuration](#resource-configuration)
- [dlt resource parameters in the resource dict](#dlt-resource-parameters-in-the-resource-dict)
- [Endpoint Configuration](#endpoint-configuration)
- [Pagination Patterns](#pagination-patterns)
- [Resource Relationships (Parent-Child)](#resource-relationships-parent-child)
- [Non-REST Endpoint Resources (Seeding)](#non-rest-endpoint-resources-seeding)
- [Query Params vs Path Params Resolve Syntax](#query-params-vs-path-params-resolve-syntax)
- [Single-Object API Responses](#single-object-api-responses)
- [include_from_parent Field Naming](#include_from_parent-field-naming)
- [Incremental Loading](#incremental-loading)
- [Processing Steps](#processing-steps)
- [Resource Defaults](#resource-defaults)
- [Complete Examples](#complete-examples)
- [Best Practices](#best-practices)
- [Troubleshooting](#troubleshooting)
## Overview
The declarative REST API source enables data extraction from REST APIs using a configuration dictionary. This approach is easier to use than custom Python code but can handle complex API patterns including authentication, pagination, and nested resources.
**When to use:**
- REST APIs with standard patterns
- APIs with clear endpoint structures
- When you want configuration-based approach
- Rapid prototyping and development
**When to use custom Python instead:**
- Very complex custom logic required
- Non-standard API patterns
- Need fine-grained control over every request
## Configuration Structure
```python
from dlt.sources.rest_api import rest_api_source
config: RESTAPIConfig = {
"client": {
# Client configuration (base URL, auth, etc.)
},
"resource_defaults": {
# Default settings for all resources
},
"resources": [
# List of API endpoints/resources
]
}
source = rest_api_source(config)
```
## Client Configuration
### Basic Client
```python
"client": {
"base_url": "https://api.example.com/v1/",
}
```
### With Authentication
```python
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
"auth": {
"token": dlt.secrets["github_token"] # Bearer token
}
}
```
### With Headers
```python
"client": {
"base_url": "https://api.example.com/",
"headers": {
"User-Agent": "MyApp/1.0",
"Accept": "application/json"
}
}
```
### With Paginator Default
```python
"client": {
"base_url": "https://api.example.com/",
"paginator": {
"type": "page_number",
"page_param": "page",
"page_size": 100
}
}
```
## Authentication Methods
### Bearer Token
```python
"auth": {
"type": "bearer",
"token": dlt.secrets["api_token"]
}
```
### Basic Authentication
```python
"auth": {
"type": "http_basic",
"username": dlt.secrets["username"],
"password": dlt.secrets["password"]
}
```
### API Key in Header
```python
"auth": {
"type": "api_key",
"name": "X-API-Key",
"api_key": dlt.secrets["api_key"],
"location": "header"
}
```
### API Key in Query Parameter
```python
"auth": {
"type": "api_key",
"name": "api_key",
"api_key": dlt.secrets["api_key"],
"location": "query"
}
```
### OAuth2 Client Credentials
```python
"auth": {
"type": "oauth2_client_credentials",
"client_id": dlt.secrets["client_id"],
"client_secret": dlt.secrets["client_secret"],
"access_token_url": "https://auth.example.com/oauth/token"
}
```
## Resource Configuration
### Simple Resource (String)
```python
"resources": [
"users", # GET /users
"products", # GET /products
]
```
### Detailed Resource (Dictionary)
```python
"resources": [
{
"name": "users",
"endpoint": {
"path": "users",
"params": {
"status": "active"
}
},
"write_disposition": "merge",
"primary_key": "id"
}
]
```
### dlt resource parameters in the resource dict
The REST API source accepts the same **dlt resource parameters** in each resource definition as documented in the [official Resource configuration](https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api/basic#resource-configuration). You can set both dlt resource parameters and rest_api-specific parameters (`endpoint`, `include_from_parent`, `processing_steps`, `auth`) in the same resource dict.
**dlt resource parameters** (in the resource dict):
- **`name`** — Resource name; also used as table name unless overridden by `table_name`.
- **`write_disposition`** — How to write data (`append`, `replace`, `merge`).
- **`primary_key`** — Primary key field(s) for merge operations.
- **`table_name`** — Override the destination table name for this resource.
- **`max_table_nesting`** — Sets the maximum depth of nested tables; beyond that, nodes are loaded as structs or JSON. Use `0` for a single table (no child tables).
- **`selected`** — Whether the resource is selected for loading (e.g. `false` for seed-only resources).
**Example: load a resource as a single table (config only)**
To load a resource as one table with no child tables, set `max_table_nesting`: `0` in the resource config:
```python
{
"name": "pokemon_details",
"max_table_nesting": 0,
"endpoint": {
"path": "pokemon/{name}",
"data_selector": "$",
"paginator": "single_page",
},
"include_from_parent": ["name"],
"primary_key": "id",
"write_disposition": "merge",
}
```
For a single table (no nested child tables), set `max_table_nesting`: `0` in the resource config instead of using `apply_hints` after the source is created.
## Endpoint Configuration
### Basic Endpoint
```python
"endpoint": {
"path": "issues"
}
```
### With Query Parameters
```python
"endpoint": {
"path": "issues",
"params": {
"sort": "updated",
"state": "open",
"per_page": 100
}
}
```
### With Data Selector (JSONPath)
```python
"endpoint": {
"path": "results",
"data_selector": "data.items" # Extract data.items from response
}
```
### POST Request with JSON Body
```python
"endpoint": {
"path": "search",
"method": "POST",
"json": {
"query": "dlt",
"filters": ["python", "data"]
}
}
```
## Pagination Patterns
### Auto-Detection
dlt automatically detects common pagination patterns. No configuration needed in most cases.
### JSON Link Pagination
```python
"paginator": {
"type": "json_link",
"next_url_path": "pagination.next" # JSONPath to next page URL
}
```
### Header Link Pagination (GitHub-style)
```python
"paginator": {
"type": "header_link",
"links_path": "link" # Header name containing next link
}
```
### Offset Pagination
```python
"paginator": {
"type": "offset",
"limit": 100,
"offset_param": "offset",
"limit_param": "limit",
"total_path": "total" # Optional: JSONPath to total count
}
```
### Page Number Pagination
```python
"paginator": {
"type": "page_number",
"page_param": "page",
"page_size": 100,
"total_path": "meta.total_pages"
}
```
### Cursor Pagination
```python
"paginator": {
"type": "cursor",
"cursor_path": "next_cursor",
"cursor_param": "cursor"
}
```
### Single Page (No Pagination)
```python
"paginator": "single_page"
```
## Resource Relationships (Parent-Child)
Define child resources that depend on parent resources:
```python
"resources": [
{
"name": "issues",
"endpoint": "issues",
"primary_key": "number"
},
{
"name": "issue_comments",
"endpoint": {
"path": "issues/{number}/comments", # Use parent field
"params": {
"issue_id": "{number}" # Also works in params
}
},
"include_from_parent": ["id", "number"] # Fields to include from parent
}
]
```
**Placeholder locations:**
- URL paths: `"path": "issues/{issue_id}/comments"`
- Query params: `"params": {"parent_id": "{issue_id}"}`
- JSON body: `"json": {"issue": "{issue_id}"}`
- Headers: `"headers": {"X-Issue-ID": "{issue_id}"}`
## Non-REST Endpoint Resources (Seeding)
You can seed REST API calls from database data or other non-API sources. A **seed resource** yields a **list of dicts**; each dict provides values for path/query placeholders in child REST resources. The REST source then issues one API request per seed row.
**Requirements:**
- Define a `@dlt.resource(selected=False)` function that **yields a list of dicts** (not individual items).
- Include the seed resource in the `resources` array before any resources that reference it.
- Reference seed fields in child endpoints using the resolve syntax (see [Query Params vs Path Params Resolve Syntax](#query-params-vs-path-params-resolve-syntax)).
**Example: seeding from a Python list**
```python
import dlt
from typing import Any, Dict, Generator, List
from dlt.sources.rest_api import rest_api_source
@dlt.resource(selected=False)
def seed_data() -> Generator[List[Dict[str, Any]], Any, Any]:
"""Must yield a LIST of dicts for the resolve pattern to work."""
yield [{"id": 1, "name": "foo"}, {"id": 2, "name": "bar"}]
config = {
"client": {"base_url": "https://api.example.com/"},
"resources": [
seed_data(), # Include the seed resource
{
"name": "details",
"endpoint": {
"path": "items/{id}",
"params": {
"name": "{resources.seed_data.name}" # Curly braces for query params
}
}
}
]
}
source = rest_api_source(config)
```
**Example: reading from DuckDB to seed weather API calls**
```python
import duckdb
import dlt
from typing import Any, Dict, Generator, List
from dlt.sources.rest_api import rest_api_source
def get_locations_from_db():
"""Read outside of dlt context to avoid pipeline conflicts."""
conn = duckdb.connect("locations.duckdb", read_only=True)
rows = conn.execute("SELECT id, lat, lng FROM locations").fetchall()
conn.close()
return [{"id": r[0], "lat": r[1], "lng": r[2]} for r in rows]
@dlt.resource(selected=False)
def locations() -> Generator[List[Dict[str, Any]], Any, Any]:
"""Must yield a LIST so each row seeds one API call."""
yield get_locations_from_db()
config = {
"client": {"base_url": "https://api.weather.com/"},
"resources": [
locations(),
{
"name": "weather",
"endpoint": {
"path": "forecast",
"params": {
"lat": "{resources.locations.lat}",
"lng": "{resources.locations.lng}"
},
"data_selector": "$",
"paginator": "single_page"
},
"include_from_parent": ["id"],
"primary_key": "_locations_id"
}
]
}
```
**Important:** The seed resource must **yield a single list** (e.g. `yield [row1, row2, ...]`). Yielding one item at a time will not drive one request per row.
## Query Params vs Path Params Resolve Syntax
**Path params** (placeholders in the URL path) use the **resolve dict**:
```python
"endpoint": {
"path": "repos/{owner}/{repo}/issues",
"params": {
"owner": {"type": "resolve", "resource": "repos", "field": "owner"},
"repo": {"type": "resolve", "resource": "repos", "field": "name"}
}
}
```
**Query params** use **curly-brace string** syntax with `{resources.<resource_name>.<field>}`:
```python
"endpoint": {
"path": "forecast",
"params": {
"latitude": "{resources.locations.lat}",
"longitude": "{resources.locations.lng}"
}
}
```
Use the resolve dict for path segments and the `{resources....}` form for query parameters.
## Single-Object API Responses
Some APIs return a **single object** instead of a list (e.g. one forecast per request). Configure the endpoint so the whole response is treated as one item and pagination is disabled:
```python
"endpoint": {
"path": "forecast",
"params": {
"latitude": "{resources.locations.lat}",
"longitude": "{resources.locations.lng}"
},
"data_selector": "$", # Treat entire response as single item
"paginator": "single_page" # No pagination needed
}
```
- `data_selector`: `"$"` means use the root of the response as the one record.
- `paginator`: `"single_page"` stops the client from requesting further pages.
## include_from_parent Field Naming
Fields brought in from a parent (or seed) resource are prefixed with `_<parent_resource_name>_` in the loaded table.
```python
"include_from_parent": ["id", "name"]
# Results in columns: _parent_resource_id, _parent_resource_name
```
If you use such a field as primary key, use the **prefixed** name:
```python
"include_from_parent": ["id"],
"primary_key": "_parent_resource_id"
```
## Incremental Loading
Load only new or changed data:
```python
{
"name": "posts",
"endpoint": {
"path": "posts",
"params": {
"created_since": "{incremental.start_value}"
}
},
"incremental": {
"cursor_path": "created_at",
"initial_value": "2024-01-01T00:00:00Z"
},
"write_disposition": "merge",
"primary_key": "id"
}
```
**Incremental parameters:**
- `cursor_path` - Field to track (e.g., timestamp, ID)
- `initial_value` - Starting point for first run
- `end_value` - Optional end value
- Placeholders: `{incremental.start_value}`, `{incremental.end_value}`, `{incremental.last_value}`
## Processing Steps
Transform data before loading:
```python
{
"name": "users",
"endpoint": "users",
"processing_steps": [
# Filter: Keep only matching items
{"filter": lambda item: item["age"] >= 18},
# Map: Transform each item
{"map": lambda item: {**item, "full_name": f"{item['first_name']} {item['last_name']}"}},
# Yield map: Transform and yield multiple items
{"yield_map": lambda item: [item, item.get("metadata")]}
]
}
```
## Resource Defaults
Set defaults for all resources:
```python
"resource_defaults": {
"write_disposition": "merge",
"primary_key": "id",
"endpoint": {
"params": {
"api_version": "v2"
}
}
}
```
Individual resources can override these defaults.
## Complete Examples
### Example 1: Pokemon API
```python
from dlt.sources.rest_api import rest_api_source
config = {
"client": {
"base_url": "https://pokeapi.co/api/v2/",
},
"resources": [
{
"name": "pokemon_list",
"endpoint": {
"path": "pokemon",
"params": {
"limit": 100
}
},
"write_disposition": "replace"
},
{
"name": "pokemon_details",
"endpoint": {
"path": "pokemon/{name}",
},
"write_disposition": "merge",
"primary_key": "id",
"include_from_parent": ["name"]
}
]
}
source = rest_api_source(config)
pipeline = dlt.pipeline(
pipeline_name="pokemon",
destination="duckdb",
dataset_name="pokemon_data"
)
pipeline.run(source)
```
### Example 2: GitHub Issues with Auth and Incremental
```python
config = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/",
"auth": {
"token": dlt.secrets["github_token"]
}
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge"
},
"resources": [
{
"name": "issues",
"endpoint": {
"path": "issues",
"params": {
"state": "all",
"since": "{incremental.start_value}"
}
},
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2024-01-01T00:00:00Z"
}
},
{
"name": "issue_comments",
"endpoint": "issues/{number}/comments",
"include_from_parent": ["id", "number"]
}
]
}
```
## Best Practices
1. **Use resource defaults** - Avoid repetition for common settings
2. **Set appropriate write dispositions** - Use merge for stateful data, append for events
3. **Define primary keys** - Essential for merge operations
4. **Use incremental loading** - Reduce API calls and data transfer
5. **Handle pagination explicitly** - When auto-detection doesn't work
6. **Test with small limits** - Use small page sizes during development
7. **Use data selectors** - Extract specific parts of complex responses
8. **Leverage parent-child relationships** - Model API dependencies correctly
9. **Store secrets properly** - Never hardcode credentials
10. **Monitor rate limits** - Be aware of API rate limiting
11. **Single table via config** - For a single table (no nested child tables), set `max_table_nesting`: `0` in the resource config instead of using `apply_hints` after the source is created.
## Troubleshooting
### Pagination Not Working
- Check API response structure
- Explicitly configure paginator type
- Verify JSONPath selectors
### Authentication Failures
- Verify credentials in `.dlt/secrets.toml`
- Check auth type matches API requirements
- Test credentials with curl/Postman first
### Missing Data
- Use `data_selector` to extract correct fields
- Check for nested response structures
- Verify endpoint paths are correct
### Child Resources Not Loading
- Ensure parent resource has required fields
- Check placeholder syntax: `{field_name}` or `{resources.<resource>.<field>}` for query params
- Verify `include_from_parent` includes needed fields
- For seed resources: ensure the seed yields a **list** of dicts, not one item per yield
```
### references/custom-sources.md
```markdown
# Creating Custom Sources in dlt
## Table of Contents
- [Overview](#overview)
- [Core Decorators](#core-decorators)
- [Yield Patterns](#yield-patterns)
- [Dynamic Resource Creation](#dynamic-resource-creation)
- [Using Python Packages](#using-python-packages)
- [Configuration and Secrets](#configuration-and-secrets)
- [REST API Helper](#rest-api-helper)
- [Schema Management](#schema-management)
- [Best Practices](#best-practices)
- [Complete Example: Multi-Endpoint API](#complete-example-multi-endpoint-api)
- [Error Handling](#error-handling)
## Overview
Custom sources allow you to extract data from any Python-accessible data source. Use custom sources when:
- No verified source exists for your data source
- You need custom logic or transformations
- Working with proprietary or internal APIs
- Using Python packages to access data
## Core Decorators
### @dlt.source
Defines a source function that returns one or more resources.
```python
@dlt.source
def my_source(api_key=dlt.secrets.value):
return my_resource()
```
**Key principles:**
- Sources should yield or return resources
- Do NOT extract data in the source function
- Leave data extraction to resources
- Source functions execute before `pipeline.run()` or `pipeline.extract()`
### @dlt.resource
Defines a resource that extracts data.
```python
@dlt.resource(write_disposition='append')
def my_resource():
for item in fetch_data():
yield item
```
**Key parameters:**
- `write_disposition` - How to write data ('append', 'replace', 'merge')
- `primary_key` - Primary key field(s) for merge operations
- `name` - Resource name (defaults to function name)
- `max_table_nesting` - Control nested table depth
## Yield Patterns
### Pattern 1: Yield Individual Items
```python
@dlt.resource
def users():
response = requests.get("https://api.example.com/users")
for user in response.json():
yield user
```
### Pattern 2: Yield Pages (Recommended for Performance)
```python
@dlt.resource
def users():
response = requests.get("https://api.example.com/users")
yield response.json() # Yield entire page
```
### Pattern 3: Yield from Generator
```python
def fetch_data():
# Generator function
for page in range(10):
yield fetch_page(page)
@dlt.resource
def my_resource():
yield from fetch_data()
```
## Dynamic Resource Creation
Create resources programmatically for multiple endpoints:
```python
@dlt.source
def my_api(api_key=dlt.secrets.value):
endpoints = ["companies", "deals", "products"]
def create_resource(endpoint):
url = f"https://api.example.com/{endpoint}"
response = requests.get(url, headers={"Authorization": f"Bearer {api_key}"})
yield response.json()
for endpoint in endpoints:
yield dlt.resource(
create_resource(endpoint),
name=endpoint,
write_disposition='merge',
primary_key='id'
)
```
## Using Python Packages
Integrate Python packages for data extraction:
```python
from simple_salesforce import Salesforce
@dlt.source
def salesforce_source(
username=dlt.secrets.value,
password=dlt.secrets.value,
security_token=dlt.secrets.value
):
sf = Salesforce(
username=username,
password=password,
security_token=security_token
)
@dlt.resource(write_disposition='merge', primary_key='Id')
def accounts():
records = sf.query_all("SELECT Id, Name, Industry FROM Account")
yield records['records']
@dlt.resource(write_disposition='merge', primary_key='Id')
def opportunities():
records = sf.query_all("SELECT Id, Name, Amount, StageName FROM Opportunity")
yield records['records']
return accounts, opportunities
```
## Configuration and Secrets
### Accessing Secrets
```python
@dlt.source
def my_source(
api_key=dlt.secrets.value, # From .dlt/secrets.toml
base_url=dlt.config.value, # From .dlt/config.toml
timeout: int = 30 # Default value
):
pass
```
### Secrets File (.dlt/secrets.toml)
```toml
[sources.my_source]
api_key = "your-secret-key"
```
### Config File (.dlt/config.toml)
```toml
[sources.my_source]
base_url = "https://api.example.com"
timeout = 60
```
## REST API Helper
Use built-in REST client for common patterns:
```python
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator
@dlt.resource(write_disposition='merge', primary_key='id')
def github_issues(api_token=dlt.secrets.value):
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
for page in paginate(
url,
auth=BearerTokenAuth(api_token),
paginator=HeaderLinkPaginator(),
params={"state": "open"}
):
yield page
```
## Schema Management
### Control Table Nesting
```python
@dlt.source(max_table_nesting=2)
def my_source():
return my_resource()
```
Limits how deeply nested tables are generated from nested data structures. For the declarative REST API source, set `max_table_nesting` per resource in the resource dict; see [rest-api-source.md](rest-api-source.md) Resource configuration.
### Resource Selection
Enable users to select specific resources:
```python
source = my_source()
pipeline.run(source.with_resources("users", "orders"))
```
## Best Practices
1. **Yield pages, not rows** - Better performance, fewer iterations
2. **Use generators** - Memory efficient for large datasets
3. **Keep sources lightweight** - Don't extract data in source functions
4. **Leverage decorators** - Use `@dlt.resource` for clear resource definition
5. **Handle pagination properly** - Use built-in helpers when possible
6. **Implement error handling** - Catch and handle API errors gracefully
7. **Use appropriate write dispositions** - Match disposition to data characteristics
8. **Set primary keys for merge** - Essential for upsert operations
9. **Limit table nesting** - Prevent excessive nested table generation
10. **Document resource parameters** - Make sources reusable and configurable
## Complete Example: Multi-Endpoint API
```python
import requests
from typing import Iterator, Any
import dlt
@dlt.source
def pokemon_api(base_url: str = "https://pokeapi.co/api/v2"):
"""
Source for Pokemon API data.
"""
@dlt.resource(write_disposition='replace', primary_key='name')
def pokemon_list() -> Iterator[dict]:
"""Fetch list of all Pokemon."""
url = f"{base_url}/pokemon"
while url:
response = requests.get(url)
response.raise_for_status()
data = response.json()
yield data['results']
url = data.get('next')
@dlt.resource(write_disposition='merge', primary_key='id')
def pokemon_details(pokemon_names: list[str]) -> Iterator[dict]:
"""Fetch detailed data for specific Pokemon."""
for name in pokemon_names:
url = f"{base_url}/pokemon/{name}"
response = requests.get(url)
response.raise_for_status()
yield response.json()
return pokemon_list, pokemon_details
# Usage
if __name__ == '__main__':
pipeline = dlt.pipeline(
pipeline_name='pokemon_pipeline',
destination='duckdb',
dataset_name='pokemon_data'
)
# Load all Pokemon and their details
source = pokemon_api()
load_info = pipeline.run(source)
print(load_info)
```
## Error Handling
```python
@dlt.resource
def resilient_resource():
for item_id in range(100):
try:
data = fetch_data(item_id)
yield data
except requests.exceptions.RequestException as e:
# Log error and continue
print(f"Failed to fetch {item_id}: {e}")
continue
except Exception as e:
# Critical error - stop
raise
```
```
### references/incremental-loading.md
```markdown
# Incremental Loading in dlt
## Table of Contents
- [Overview](#overview)
- [Write Dispositions](#write-dispositions)
- [Choosing the Right Strategy](#choosing-the-right-strategy)
- [Cursor-Based Incremental Loading](#cursor-based-incremental-loading)
- [Merge Strategies](#merge-strategies)
- [Advanced Techniques](#advanced-techniques)
- [Best Practices](#best-practices)
## Overview
Incremental loading transfers only new or modified data rather than reprocessing existing records. This reduces latency and operational costs while requiring careful state management.
## Write Dispositions
### 1. Replace (Full Load)
```python
@dlt.resource(write_disposition='replace')
def my_resource():
yield data
```
- Completely overwrites destination dataset with current source data
- Use when you need a fresh snapshot every time
- No state tracking required
### 2. Append
```python
@dlt.resource(write_disposition='append')
def my_resource():
yield data
```
- Adds new data without modifying existing records
- Best for stateless data (events, logs)
- Simple and efficient for immutable data
### 3. Merge
```python
@dlt.resource(
write_disposition='merge',
primary_key='id'
)
def my_resource():
yield data
```
- Updates existing records using merge keys or primary keys
- Enables upserts and deduplication
- Use for stateful data that changes over time
## Choosing the Right Strategy
**Ask: Is your data stateful or stateless?**
- **Stateless data** (unchanging events) → Use `append`
- **Stateful data** (user profiles, evolving records) → Use `merge` or SCD2
**For stateful data, ask: Do you need change history?**
- **No history needed** → Use `merge` with incremental extraction
- **History needed** → Use SCD2 (Slowly Changing Dimensions Type-2)
## Cursor-Based Incremental Loading
Track changes via timestamp or ID fields:
```python
@dlt.resource(
write_disposition='merge',
primary_key='id'
)
def incremental_resource(
updated_at=dlt.sources.incremental('updated_at', initial_value='2024-01-01T00:00:00Z')
):
# Fetch data modified after the last cursor value
url = f"https://api.example.com/data?updated_since={updated_at.last_value}"
response = requests.get(url)
yield response.json()
```
Key parameters:
- `cursor_path` - Field to track (e.g., 'updated_at', 'id')
- `initial_value` - Starting point for first run
- `last_value` - Last processed value (maintained by dlt)
## Merge Strategies
### Delete-Insert
Replaces records matching the merge key.
### SCD2 (Slowly Changing Dimensions Type-2)
Preserves historical changes by versioning records.
### Upsert
Updates existing records, inserts new ones.
## Advanced Techniques
### Lag/Attribution Windows
Refresh data within specific timeframes to handle late-arriving data:
```python
@dlt.resource(
write_disposition='merge',
primary_key='id'
)
def resource_with_lag(
updated_at=dlt.sources.incremental(
'updated_at',
initial_value='2024-01-01T00:00:00Z',
lag=timedelta(days=2) # Re-fetch last 2 days
)
):
yield data
```
### Full Refresh
Force complete data reload when needed:
```python
# Reset incremental state
pipeline.run(source(), refresh='drop_sources')
```
For merge operations, this deletes destination data and reloads fresh content.
## Best Practices
1. **Choose appropriate cursor fields**: Use monotonically increasing fields (timestamps, IDs)
2. **Handle timezone consistency**: Ensure cursor timestamps are in UTC
3. **Consider data latency**: Use lag windows for systems with delayed updates
4. **Test incremental logic**: Verify that incremental runs don't miss or duplicate data
5. **Monitor state**: Check dlt's state to ensure cursors advance correctly
6. **Handle deletions**: Merge doesn't detect deletions by default - consider soft deletes or periodic full refreshes
```
### references/performance-tuning.md
```markdown
# Performance Tuning in dlt
## Table of Contents
- [Overview](#overview)
- [Worker Configuration](#worker-configuration)
- [Buffer Management](#buffer-management)
- [File Rotation & Compression](#file-rotation--compression)
- [Extract Optimization Techniques](#extract-optimization-techniques)
- [Storage Management](#storage-management)
- [Configuration Scoping](#configuration-scoping)
- [Performance Best Practices](#performance-best-practices)
- [Common Optimization Scenarios](#common-optimization-scenarios)
## Overview
dlt pipelines have three main stages, each with configurable parallelism:
1. **Extract** - Fetch data from sources
2. **Normalize** - Process and prepare data
3. **Load** - Write to destination
## Worker Configuration
### Extract Stage
Uses thread pools for concurrent data extraction.
**Config (.dlt/config.toml):**
```toml
[extract]
workers = 5 # Thread pool workers (default: 5)
max_parallel_items = 20 # Async parallelism (default: 20)
```
**When to adjust:**
- Increase `workers` for I/O-bound operations (API calls)
- Increase `max_parallel_items` for async operations
- Monitor CPU usage to avoid over-threading
### Normalize Stage
Uses process pools for concurrent file processing.
**Config:**
```toml
[normalize]
workers = 3 # Process pool workers (default: 3)
start_method = "spawn" # Recommended for Linux with threading
```
**When to adjust:**
- Increase `workers` for CPU-intensive transformations
- Use `spawn` on Linux systems with threading issues
- Balance against available CPU cores
### Load Stage
Thread pool-based concurrent loading to destination.
**Config:**
```toml
[load]
workers = 50 # Concurrent load jobs (default: 20)
```
**When to adjust:**
- Increase for destinations that handle high concurrency well
- Decrease if hitting destination rate limits
- Monitor destination performance metrics
## Buffer Management
Control in-memory buffer sizes:
**Config:**
```toml
[data_writer]
buffer_max_items = 100 # Items in buffer (default: 5000)
```
**When to adjust:**
- Decrease for memory-constrained environments
- Increase for high-throughput pipelines with adequate memory
## File Rotation & Compression
Enable parallelization by rotating intermediary files:
**Config:**
```toml
[data_writer]
file_max_items = 100000 # Max items per file
file_max_bytes = 1000000 # Max bytes per file (1MB)
[normalize.data_writer]
disable_compression = true # Disable for faster processing
```
**Important:** Default setting creates a single intermediary file. For millions of records, configure rotation to enable parallel processing.
**When to use:**
- Large datasets (millions+ records)
- Need parallel normalize/load stages
- Balance file count vs. file size
## Extract Optimization Techniques
### 1. Yield Pages Instead of Rows
```python
@dlt.resource
def optimized_resource():
for page in paginate(url):
yield page # Yield entire page, not individual items
```
**Benefits:**
- Reduces pipeline iterations
- Better batching for downstream stages
- Lower overhead
### 2. Use Built-in HTTP Client
```python
from dlt.sources.helpers.rest_client import paginate
for page in paginate(url):
yield page
```
**Benefits:**
- Optimized for dlt workflows
- Built-in retry logic
- Performance tuning included
### 3. Use orjson for JSON Parsing
dlt uses `orjson` by default for fast JSON parsing.
### 4. Switch to FIFO Mode for Debugging
```toml
[extract]
next_item_mode = "fifo" # Sequential, deterministic extraction
```
Use only for debugging - reduces parallelism.
## Storage Management
### External Storage
For constrained environments, use external bucket storage:
```python
import os
os.environ["DLT_DATA_DIR"] = "/path/to/mounted/bucket"
```
### Monitoring
Enable graceful progress monitoring:
```bash
PROGRESS=log python pipeline.py
```
## Configuration Scoping
Apply settings at different levels:
**Global:**
```toml
[extract]
workers = 10
```
**Per-source:**
```toml
[sources.my_source.extract]
workers = 20
```
**Per-resource:**
```toml
[sources.my_source.resources.my_resource.extract]
workers = 5
```
## Performance Best Practices
1. **Start with defaults** - Profile first, optimize second
2. **Monitor resource usage** - Track CPU, memory, network I/O
3. **Optimize bottlenecks** - Focus on the slowest stage
4. **Test incrementally** - Change one setting at a time
5. **Balance parallelism** - More workers isn't always better
6. **Consider destination limits** - Respect rate limits and connection pools
7. **Use appropriate write disposition** - `append` is faster than `merge`
8. **Minimize data transformations** - Do heavy processing in source when possible
9. **Batch API calls** - Fetch data in pages/chunks
10. **Enable file rotation** - For large datasets requiring parallel processing
## Common Optimization Scenarios
### High-Volume API Extraction
```toml
[extract]
workers = 10
max_parallel_items = 50
[data_writer]
file_max_items = 50000
```
### CPU-Intensive Normalization
```toml
[normalize]
workers = 8
start_method = "spawn"
```
### Fast Destination Loading
```toml
[load]
workers = 100
[data_writer]
file_max_items = 100000
disable_compression = true
```
### Memory-Constrained Environment
```toml
[data_writer]
buffer_max_items = 1000
[extract]
workers = 2
[normalize]
workers = 1
```
```
### references/troubleshooting.md
```markdown
# Troubleshooting dlt Pipelines
## Table of Contents
- [Installation and Setup](#installation-and-setup)
- [Configuration and Secrets](#configuration-and-secrets)
- [Pipeline Execution](#pipeline-execution)
- [Incremental Loading](#incremental-loading)
- [REST API Source](#rest-api-source)
- [Performance Issues](#performance-issues)
- [Destination-Specific Issues](#destination-specific-issues)
- [Data Quality Issues](#data-quality-issues)
- [Debugging Techniques](#debugging-techniques)
- [Getting Help](#getting-help)
- [Error Message Reference](#error-message-reference)
## Common Issues and Solutions
### Installation and Setup
#### ModuleNotFoundError: No module named 'dlt'
```bash
# Install dlt
pip install dlt
# Install with specific destination support
pip install dlt[duckdb]
pip install dlt[bigquery]
pip install dlt[snowflake]
```
#### dlt init fails
```bash
# Ensure you're in a Python environment
python --version # Should show Python 3.8+
# Update dlt to latest version
pip install --upgrade dlt
# Check available sources
dlt init --list-sources
```
### Configuration and Secrets
#### Credentials not found
**Problem**: `KeyError` or `ConfigFieldMissingException`
**Solution**:
1. Check `.dlt/secrets.toml` exists and has correct structure
2. Verify secret names match function parameters exactly
3. Ensure secrets are under correct section
```toml
# Correct structure
[sources.my_source]
api_key = "your-key"
# NOT
[my_source] # Missing 'sources.' prefix
api_key = "your-key"
```
#### Secrets in wrong location
**Problem**: Secrets not being loaded
**Solution**:
- Secrets belong in `.dlt/secrets.toml` (never commit!)
- Config belongs in `.dlt/config.toml`
- Check file is in project root or `.dlt/` directory
### Pipeline Execution
#### Pipeline runs but loads no data
**Checklist**:
1. Resource function is actually yielding data: Add `print()` statements
2. Data selector is correct (for REST API source)
3. API authentication is working: Check API response
4. Resource is included in pipeline run
```python
# Debug: Print what's being yielded
@dlt.resource
def my_resource():
data = fetch_data()
print(f"Fetched {len(data)} items") # Debug
yield data
# Ensure resource is actually run
pipeline.run(source()) # Not just source
```
#### Schema inference issues
**Problem**: Incorrect data types or table structure
**Solution**:
```python
# Explicitly set hints
@dlt.resource(
columns={"id": {"data_type": "bigint"}},
primary_key="id"
)
def my_resource():
yield data
# Or in table_schema
@dlt.resource(
table_schema={
"columns": {
"created_at": {"data_type": "timestamp"},
"amount": {"data_type": "decimal"}
}
}
)
```
#### Data type conversion errors
**Problem**: `ValueError` or destination-specific type errors
**Solution**:
- Check source data types match destination capabilities
- Convert data before yielding
- Use dlt data type hints
```python
from datetime import datetime
@dlt.resource
def my_resource():
data = fetch_data()
for item in data:
# Convert string to datetime
if isinstance(item['date'], str):
item['date'] = datetime.fromisoformat(item['date'])
yield item
```
### Incremental Loading
#### Incremental cursor not advancing
**Problem**: Pipeline reloads same data every run
**Checklist**:
1. Cursor field exists in data
2. Cursor field is monotonically increasing
3. Write disposition is correct (usually `merge`)
4. Primary key is set for merge operations
```python
# Correct incremental setup
@dlt.resource(
write_disposition='merge', # Required for incremental
primary_key='id' # Required for merge
)
def my_resource(
updated_at=dlt.sources.incremental('updated_at', initial_value='2024-01-01T00:00:00Z')
):
# Use cursor in API call
data = fetch_data(since=updated_at.last_value)
yield data
```
#### Incremental state reset needed
**Problem**: Need to reload all data
**Solution**:
```bash
# Drop state and reload
python pipeline.py --refresh drop_sources
# Or in code
pipeline.run(source(), refresh='drop_sources')
```
### REST API Source
#### Pipeline context conflicts with dlt.attach()
**Problem:** Using `dlt.attach()` inside a resource that runs within another pipeline causes `ContainerInjectableContextMangled` (or similar pipeline context) errors. The same can happen when opening a dlt-managed or pipeline-associated connection from inside a resource that is part of a different pipeline.
**Solution:** Read data **before** creating or running the pipeline, using direct database connections (no dlt context):
```python
import duckdb
def get_data_from_other_pipeline():
"""Read outside of dlt context to avoid conflicts."""
conn = duckdb.connect("other_pipeline.duckdb", read_only=True)
result = conn.execute("SELECT * FROM my_table").fetchall()
conn.close()
return result
# Then use the data in your pipeline
data = get_data_from_other_pipeline()
pipeline = dlt.pipeline(...)
pipeline.run(my_source(data))
```
Use this pattern when a REST API source is seeded from another pipeline’s database: load the seed data in a plain function with a direct DB connection, then pass that data into your seed resource so no `dlt.attach()` or pipeline context is used during extraction.
#### Authentication failing
**Checklist**:
1. Correct auth type for the API
2. Credentials in `.dlt/secrets.toml`
3. Token/key is valid and not expired
4. Auth is in correct location (header vs query)
```python
# Test credentials separately
import requests
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
print(response.status_code) # Should be 200
```
#### Pagination not working
**Problem**: Only getting first page of results
**Solutions**:
```python
# 1. Let dlt auto-detect (usually works)
"endpoint": "users"
# 2. Explicitly configure paginator
"endpoint": {
"path": "users",
"paginator": {
"type": "page_number",
"page_param": "page",
"page_size": 100
}
}
# 3. Check API response structure
# Add data_selector if data is nested
"endpoint": {
"path": "users",
"data_selector": "data.results" # Adjust to your API
}
```
#### Parent-child resources not linking
**Problem**: Child resources not loading data
**Solution**:
```python
# Ensure parent has required fields
{
"name": "parent",
"endpoint": "parents",
"primary_key": "id" # Make sure this exists in data
},
{
"name": "child",
"endpoint": "parents/{id}/children", # Use field from parent
"include_from_parent": ["id"] # Must match placeholder
}
```
### Performance Issues
#### Pipeline is slow
**Diagnosis**:
```bash
# Run with progress logging
PROGRESS=log python pipeline.py
```
**Solutions**:
1. **Increase parallelism**:
```toml
[extract]
workers = 10
[load]
workers = 50
```
2. **Enable file rotation** for large datasets:
```toml
[data_writer]
file_max_items = 100000
```
3. **Yield pages, not rows**:
```python
# Good
yield response.json() # Entire page
# Bad
for item in response.json():
yield item # Individual items
```
#### Memory issues
**Problem**: `MemoryError` or system running out of RAM
**Solutions**:
```toml
# Reduce buffer size
[data_writer]
buffer_max_items = 1000
# Use external storage
[runtime]
data_dir = "/path/to/external/storage"
```
```python
# In code
import os
os.environ["DLT_DATA_DIR"] = "/mnt/external/storage"
```
### Destination-Specific Issues
#### BigQuery: 403 Forbidden
- Check credentials file path is correct
- Verify service account has necessary permissions
- Ensure project ID is correct
#### DuckDB: Database locked
- Close other connections to the database
- Use `read_only=False` in destination config
- Ensure no other processes are accessing the file
#### Snowflake: Connection timeout
- Check network connectivity
- Verify account identifier is correct
- Ensure credentials are valid
### Data Quality Issues
#### Duplicate records
**Causes**:
- Missing primary key
- Wrong write disposition
- Incremental cursor issues
**Solutions**:
```python
# Ensure primary key is set
@dlt.resource(
write_disposition='merge',
primary_key='id' # Or composite: ['id', 'timestamp']
)
# Or deduplicate manually
@dlt.resource
def deduped_resource():
data = fetch_data()
seen = set()
for item in data:
if item['id'] not in seen:
seen.add(item['id'])
yield item
```
#### Missing fields
**Problem**: Expected fields not in destination
**Diagnosis**:
- Check source data actually contains the fields
- Verify data selector for REST API sources
- Check schema inference
**Solution**:
```python
# Validate data before yielding
@dlt.resource
def validated_resource():
data = fetch_data()
for item in data:
# Ensure required fields exist
assert 'id' in item, f"Missing id in {item}"
assert 'name' in item, f"Missing name in {item}"
yield item
```
### Debugging Techniques
#### Enable detailed logging
```python
import logging
logging.basicConfig(level=logging.DEBUG)
# Run pipeline
pipeline.run(source())
```
#### Inspect pipeline state
```python
# Check pipeline info
print(pipeline.state)
# List tables
print(pipeline.default_schema.tables)
# Check last load info
load_info = pipeline.run(source())
print(load_info)
```
#### Use pipeline command
```bash
# Show pipeline info and open dashboard
dlt pipeline <pipeline_name> show
# View pipeline state
dlt pipeline <pipeline_name> info
# Trace last run
dlt pipeline <pipeline_name> trace
```
#### Test resources independently
```python
# Extract without loading
pipeline.extract(source())
# Normalize without loading
pipeline.normalize()
# Check extracted files
import os
print(os.listdir(pipeline.working_dir))
```
### Getting Help
When stuck:
1. **Check documentation**: https://dlthub.com/docs
2. **Search GitHub issues**: https://github.com/dlt-hub/dlt/issues
3. **Join Slack community**: https://dlthub.com/community
4. **Enable debug logging** and check error messages
5. **Create minimal reproducible example**
6. **Check dlt version**: `pip show dlt`
### Error Message Reference
| Error | Common Cause | Solution |
|-------|--------------|----------|
| `KeyError` in secrets | Missing or misnamed secret | Check `.dlt/secrets.toml` structure |
| `TableSchemaUpdateError` | Schema change conflict | Review schema evolution settings |
| `PipelineStepFailed` | Error in resource function | Check resource code and logs |
| `DestinationConnectionError` | Invalid destination config | Verify destination credentials |
| `ResourceExtractionError` | Error during data extraction | Add error handling in resource |
| `InvalidStepFunctionArguments` | Wrong function signature | Check decorator parameters |
```
### references/examples.md
```markdown
# Workflow Examples
Full step-by-step walkthroughs. For minimal code patterns see the Pipeline Patterns section in [SKILL.md](../SKILL.md).
## Table of Contents
- [Creating a Pokemon API Pipeline](#creating-a-pokemon-api-pipeline)
- [Setting Up a GitHub Verified Source](#setting-up-a-github-verified-source)
- [Custom Python Pipeline with Database Source](#custom-python-pipeline-with-database-source)
- [Declarative REST API with Authentication](#declarative-rest-api-with-authentication)
## Creating a Pokemon API Pipeline
**User request**: "Create a pipeline ingesting data from https://pokeapi.co/api/v2/pokemon/ and https://pokeapi.co/api/v2/pokemon/{pokemon_name}"
**Step-by-step:**
1. **Analyze**: REST API with standard patterns → Use declarative approach
2. **Destination**: Ask user (assume DuckDB for this example)
3. **Create pipeline** using declarative REST template
4. **Configure**:
```python
config = {
"client": {"base_url": "https://pokeapi.co/api/v2/"},
"resources": [
{
"name": "pokemon_list",
"endpoint": "pokemon",
"write_disposition": "replace"
},
{
"name": "pokemon_details",
"endpoint": "pokemon/{name}",
"write_disposition": "merge",
"primary_key": "id",
"include_from_parent": ["name"]
}
]
}
```
5. **No secrets needed** (public API)
6. **Create pipeline code** with config
7. **Test**: Run pipeline
8. **Inspect**: Open dashboard to verify data
---
## Setting Up a GitHub Verified Source
**User request**: "Load my GitHub repository issues and pull requests into BigQuery"
**Step-by-step:**
1. **Analyze**: GitHub is a verified source → Use verified source approach
2. **Destination**: BigQuery (requires credentials)
3. **Install the verified source**:
```bash
dlt init github bigquery
```
4. **Configure secrets** in `.dlt/secrets.toml`:
```toml
[sources.github]
access_token = "ghp_your_github_token"
[destination.bigquery]
location = "US"
[destination.bigquery.credentials]
project_id = "your-project-id"
private_key = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----"
client_email = "[email protected]"
```
5. **Configure** in `.dlt/config.toml`:
```toml
[sources.github]
owner = "your-org"
name = "your-repo"
```
6. **Create pipeline code**:
```python
import dlt
from github import github_reactions
pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="bigquery",
dataset_name="github_data"
)
# Load issues and pull requests with reactions
load_info = pipeline.run(github_reactions("your-org", "your-repo"))
print(load_info)
```
7. **Test**: Run pipeline and check for errors
8. **Inspect**: Open dashboard to verify tables created
---
## Custom Python Pipeline with Database Source
**User request**: "Extract data from our PostgreSQL database and load it into Snowflake with incremental updates"
**Step-by-step:**
1. **Analyze**: Database extraction with custom logic → Use custom Python approach
2. **Destination**: Snowflake (requires credentials)
3. **Install dependencies**:
```bash
pip install dlt[snowflake] psycopg2-binary
```
4. **Configure secrets** in `.dlt/secrets.toml`:
```toml
[sources.postgres]
connection_string = "postgresql://user:password@host:5432/database"
[destination.snowflake]
database = "YOUR_DATABASE"
warehouse = "YOUR_WAREHOUSE"
role = "YOUR_ROLE"
[destination.snowflake.credentials]
username = "your_username"
password = "your_password"
host = "your_account.snowflakecomputing.com"
```
5. **Create the pipeline**:
```python
import dlt
import psycopg2
from dlt.sources.credentials import ConnectionStringCredentials
@dlt.source
def postgres_source(connection_string: str = dlt.secrets.value):
@dlt.resource(write_disposition="merge", primary_key="id")
def customers():
"""Load customers with incremental updates based on updated_at."""
conn = psycopg2.connect(connection_string)
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, email, created_at, updated_at
FROM customers
WHERE updated_at > %s
""", (dlt.current.resource_state().get("last_updated", "1970-01-01"),))
for row in cursor:
yield {
"id": row[0],
"name": row[1],
"email": row[2],
"created_at": row[3],
"updated_at": row[4]
}
# Update state for next run
dlt.current.resource_state()["last_updated"] = str(row[4]) if row else None
cursor.close()
conn.close()
@dlt.resource(write_disposition="replace")
def products():
"""Full refresh of products table."""
conn = psycopg2.connect(connection_string)
cursor = conn.cursor()
cursor.execute("SELECT id, name, price, category FROM products")
for row in cursor:
yield {
"id": row[0],
"name": row[1],
"price": row[2],
"category": row[3]
}
cursor.close()
conn.close()
return [customers, products]
if __name__ == "__main__":
pipeline = dlt.pipeline(
pipeline_name="postgres_to_snowflake",
destination="snowflake",
dataset_name="postgres_replica"
)
load_info = pipeline.run(postgres_source())
print(load_info)
```
6. **Test**: Run pipeline and verify incremental loading works
7. **Inspect**: Check Snowflake tables and dlt dashboard
---
## Declarative REST API with Authentication
**User request**: "Load data from a private API that requires OAuth2 authentication"
**Step-by-step:**
1. **Analyze**: REST API with OAuth2 → Use declarative approach with auth config
2. **Destination**: DuckDB (local testing)
3. **Configure secrets** in `.dlt/secrets.toml`:
```toml
[sources.rest_api]
api_key = "your_api_key"
client_id = "your_client_id"
client_secret = "your_client_secret"
```
4. **Create pipeline with OAuth2**:
```python
import dlt
from dlt.sources.rest_api import rest_api_source
config = {
"client": {
"base_url": "https://api.example.com/v1/",
"auth": {
"type": "oauth2_client_credentials",
"access_token_url": "https://api.example.com/oauth/token",
"client_id": dlt.secrets["sources.rest_api.client_id"],
"client_secret": dlt.secrets["sources.rest_api.client_secret"],
}
},
"resources": [
{
"name": "users",
"endpoint": "users",
"write_disposition": "merge",
"primary_key": "id",
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2024-01-01T00:00:00Z"
}
},
{
"name": "orders",
"endpoint": "orders",
"write_disposition": "append",
"params": {
"status": "completed"
}
}
]
}
if __name__ == "__main__":
pipeline = dlt.pipeline(
pipeline_name="private_api",
destination="duckdb",
dataset_name="api_data"
)
source = rest_api_source(config)
load_info = pipeline.run(source)
print(load_info)
```
5. **Test**: Run and verify authentication works
6. **Inspect**: Open dashboard to check loaded data
```
### references/core-concepts.md
```markdown
# dlt Core Concepts
## Table of Contents
- [What is dlt?](#what-is-dlt)
- [Core Components](#core-components)
- [Key Capabilities](#key-capabilities)
- [Data Normalization and Nested Tables](#data-normalization-and-nested-tables)
- [Basic Workflow](#basic-workflow)
- [Configuration](#configuration)
- [Write Dispositions](#write-dispositions)
- [Data Inspection](#data-inspection)
## What is dlt?
dlt is an open-source Python library that loads data from various data sources into well-structured datasets. It emphasizes being easy to use, flexible, and scalable while providing lightweight interfaces for data extraction, loading, inspection, and transformation.
## Core Components
### Sources
Sources extract data from multiple origins including REST APIs, SQL databases, cloud storage systems, and Python data structures. A source is a function decorated with `@dlt.source` that returns one or more resources.
**Key principles:**
- Sources should NOT extract data directly - leave that to resources
- Sources should yield or return resources
- Avoid heavy operations in source functions
### Resources
Resources are the actual data extraction units. They can be:
- Functions decorated with `@dlt.resource`
- Created dynamically using `dlt.resource()` as a function call
- Yielded from source functions
### Destinations
dlt supports various destinations including:
- **Data Warehouses**: BigQuery, Snowflake, Redshift, Databricks, Azure Synapse
- **Databases**: DuckDB, ClickHouse, Postgres, SQL Server
- **Data Lakes**: Delta, Iceberg, Athena/Glue
- **Vector Databases**: Weaviate, LanceDB, Qdrant
- **Storage**: S3, GCS, Azure Blob, filesystem
- **Custom**: Create custom destinations with `@dlt.destination` decorator
### Pipelines
A pipeline orchestrates the data movement process, connecting sources to destinations:
```python
pipeline = dlt.pipeline(
pipeline_name='my_pipeline',
destination='duckdb',
dataset_name='my_data'
)
load_info = pipeline.run(my_source())
```
## Key Capabilities
### Schema Management
- Automatic schema inference
- Data type detection
- Data normalization
- Nested structure handling
- Schema evolution support
### Data Normalization and Nested Tables
dlt **normalizes** nested data into separate tables:
- **Arrays** become child tables with a `__` separator: e.g. `countries__capitals`.
- Child tables have **`_dlt_parent_id`** referencing the parent row’s **`_dlt_id`**.
- Array elements have **`_dlt_list_idx`** for ordering.
**Example:** API returns `{"name": "France", "capitals": ["Paris"]}`
- **Main table:** `countries` with columns `name`, `_dlt_id`.
- **Child table:** `countries__capitals` with `value`, `_dlt_parent_id`, `_dlt_list_idx`.
To query normalized data, join on the parent key:
```sql
SELECT c.name, cap.value AS capital
FROM countries c
JOIN countries__capitals cap ON c._dlt_id = cap._dlt_parent_id
```
### Pipeline Automation
- Incremental loading
- Schema evolution
- Schema and data contracts
- Reduced maintenance overhead
### Data Access
- Python and SQL queries
- Transformations
- Pipeline inspection
- Visualization support
## Basic Workflow
1. Define data sources (verified, custom, or declarative)
2. Configure extraction parameters
3. Create pipeline instance specifying destination
4. Execute pipeline to load transformed data
5. Inspect and query loaded data
## Configuration
### Directory Structure
When you run `dlt init <source> <destination>`, it creates:
- Pipeline Python file
- `.dlt/` directory for configuration
- `requirements.txt` for dependencies
### Config and Secrets
- `.dlt/config.toml` - Non-sensitive configuration
- `.dlt/secrets.toml` - Sensitive credentials (never commit!)
Example secrets structure:
```toml
[sources]
api_secret_key = '<your-api-key>'
[destination.bigquery]
credentials = '<path-to-service-account.json>'
```
**Note**: DuckDB doesn't require credentials - just specify the file path in `.dlt/config.toml` or directly in the pipeline destination parameter.
Secret names correspond to argument names in source functions, enabling automatic credential injection.
## Write Dispositions
Control how data is written to destinations:
1. **replace** - Full load, overwrites existing data
2. **append** - Adds new data without modifying existing records
3. **merge** - Updates existing records using merge/primary keys (upserts)
## Data Inspection
After loading data:
```bash
dlt pipeline <pipeline_name> show
```
This opens the pipeline visualization/dashboard for debugging and inspection.
```
### assets/templates/custom_python_pipeline.py
```python
"""
Custom Python dlt Pipeline Template
This template provides a skeleton for creating a custom dlt pipeline using Python code.
Use this when you need custom logic or are using Python packages to access data.
"""
import dlt
from typing import Iterator, Any
@dlt.source
def custom_source(
# Add configuration parameters here
# Use dlt.secrets.value for sensitive values from .dlt/secrets.toml
# Use dlt.config.value for non-sensitive values from .dlt/config.toml
api_key: str = dlt.secrets.value,
base_url: str = dlt.config.value,
):
"""
Define your custom source.
Args:
api_key: API authentication key
base_url: Base URL for the API
Returns:
One or more dlt resources
"""
# Return one or more resources
return my_resource()
@dlt.resource(
write_disposition="append", # Options: "append", "replace", "merge"
# primary_key="id", # Uncomment for merge operations
# table_name="custom_table" # Optional: override table name
)
def my_resource() -> Iterator[dict[str, Any]]:
"""
Define your resource that extracts data.
Yields:
Data items as dictionaries
"""
# TODO: Implement data extraction logic
# Example: Fetch data from API, database, file, etc.
# Yield data (pages recommended over individual items for performance)
data = [] # Replace with actual data fetching
yield data
# Example: Resource with incremental loading
@dlt.resource(
write_disposition="merge",
primary_key="id"
)
def incremental_resource(
updated_at=dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z"
)
) -> Iterator[dict[str, Any]]:
"""
Resource with incremental loading based on a cursor field.
Args:
updated_at: Incremental cursor tracking the updated_at field
Yields:
Data items updated since the last run
"""
# TODO: Fetch data modified since updated_at.last_value
# Example: data = fetch_data(since=updated_at.last_value)
data = [] # Replace with actual data fetching
yield data
if __name__ == "__main__":
# Configure the pipeline
pipeline = dlt.pipeline(
pipeline_name="custom_pipeline", # Unique pipeline name
destination="duckdb", # Options: duckdb, bigquery, snowflake, etc.
dataset_name="custom_data" # Dataset/schema name in destination
)
# Run the pipeline
load_info = pipeline.run(custom_source())
# Print load information
print(load_info)
# Optional: Show pipeline in dashboard
# pipeline.show()
```
### assets/templates/verified_source_pipeline.py
```python
"""
Verified Source dlt Pipeline Template
This template provides a skeleton for using dlt verified sources.
Verified sources are pre-built, tested connectors for popular platforms.
Setup:
1. Run: dlt init <source_name> <destination_name>
Example: dlt init salesforce bigquery
2. Configure credentials in .dlt/secrets.toml
3. Modify this file to customize resource selection and behavior
"""
import dlt
# TODO: Import the verified source
# This import will be available after running: dlt init <source_name> <destination>
# Example imports:
# from salesforce import salesforce_source
# from github import github_source
# from stripe import stripe_source
def load_all_resources() -> None:
"""
Load all resources from the verified source.
"""
# TODO: Replace with your verified source name
source = verified_source()
pipeline = dlt.pipeline(
pipeline_name="verified_pipeline", # Unique pipeline name
destination="duckdb", # Options: duckdb, bigquery, snowflake, etc.
dataset_name="verified_data" # Dataset/schema name in destination
)
# Load all resources from the source
load_info = pipeline.run(source)
print(load_info)
def load_selected_resources() -> None:
"""
Load only specific resources from the verified source.
"""
# TODO: Replace with your verified source name
source = verified_source()
pipeline = dlt.pipeline(
pipeline_name="verified_pipeline",
destination="duckdb",
dataset_name="verified_data"
)
# Load only specific resources
# TODO: Replace with actual resource names from your verified source
load_info = pipeline.run(
source.with_resources("resource1", "resource2", "resource3")
)
print(load_info)
def load_with_customization() -> None:
"""
Load resources with customized behavior (incremental, write disposition, etc.).
"""
# TODO: Replace with your verified source name
source = verified_source()
# Customize specific resources
# Example: Configure incremental loading
source.resources["resource_name"].apply_hints(
write_disposition="merge",
primary_key="id",
incremental=dlt.sources.incremental("updated_at")
)
# Example: Change write disposition for a resource
source.resources["another_resource"].apply_hints(
write_disposition="replace"
)
pipeline = dlt.pipeline(
pipeline_name="verified_pipeline",
destination="duckdb",
dataset_name="verified_data"
)
load_info = pipeline.run(source)
print(load_info)
if __name__ == "__main__":
# Choose which loading strategy to use:
# Option 1: Load all resources
load_all_resources()
# Option 2: Load only specific resources
# load_selected_resources()
# Option 3: Load with customization
# load_with_customization()
```
### assets/templates/declarative_rest_pipeline.py
```python
"""
Declarative REST API dlt Pipeline Template
This template provides a skeleton for creating a pipeline using the declarative REST API source.
Use this for REST APIs with standard patterns (authentication, pagination, etc.).
"""
import dlt
from dlt.sources.rest_api import rest_api_source
# Define the REST API configuration
config = {
"client": {
"base_url": "https://api.example.com/v1/",
# Optional: Authentication
# "auth": {
# "type": "bearer",
# "token": dlt.secrets["api_token"]
# },
# Optional: Custom headers
# "headers": {
# "User-Agent": "MyApp/1.0"
# },
# Optional: Default paginator for all resources
# "paginator": {
# "type": "page_number",
# "page_param": "page",
# "page_size": 100
# }
},
# Optional: Default settings for all resources
"resource_defaults": {
"write_disposition": "append",
"primary_key": "id",
},
# Define API resources/endpoints
"resources": [
# Simple endpoint (string notation)
# "users",
# Detailed endpoint (dictionary notation)
{
"name": "resource_name",
"endpoint": {
"path": "endpoint_path",
# Optional: Query parameters
# "params": {
# "status": "active",
# "limit": 100
# },
# Optional: Data selector (JSONPath)
# "data_selector": "data.results",
# Optional: Pagination override
# "paginator": {
# "type": "offset",
# "limit": 100,
# "offset_param": "offset",
# "limit_param": "limit"
# }
},
"write_disposition": "merge",
"primary_key": "id",
# Optional: Incremental loading
# "incremental": {
# "cursor_path": "updated_at",
# "initial_value": "2024-01-01T00:00:00Z"
# }
},
# Example: Parent-child relationship
# {
# "name": "child_resource",
# "endpoint": {
# "path": "parents/{parent_id}/children"
# },
# "include_from_parent": ["id", "name"]
# }
]
}
def run_pipeline() -> None:
"""
Execute the REST API pipeline.
"""
# Create the REST API source from config
source = rest_api_source(config)
# Configure the pipeline
pipeline = dlt.pipeline(
pipeline_name="rest_api_pipeline", # Unique pipeline name
destination="duckdb", # Options: duckdb, bigquery, snowflake, etc.
dataset_name="rest_api_data" # Dataset/schema name in destination
)
# Run the pipeline
load_info = pipeline.run(source)
# Print load information
print(load_info)
if __name__ == "__main__":
run_pipeline()
# Authentication Examples
# ----------------------
# Bearer Token:
# "auth": {
# "type": "bearer",
# "token": dlt.secrets["api_token"]
# }
# Basic Auth:
# "auth": {
# "type": "http_basic",
# "username": dlt.secrets["username"],
# "password": dlt.secrets["password"]
# }
# API Key in Header:
# "auth": {
# "type": "api_key",
# "name": "X-API-Key",
# "api_key": dlt.secrets["api_key"],
# "location": "header"
# }
# API Key in Query:
# "auth": {
# "type": "api_key",
# "name": "api_key",
# "api_key": dlt.secrets["api_key"],
# "location": "query"
# }
# Pagination Examples
# -------------------
# JSON Link (next URL in response):
# "paginator": {
# "type": "json_link",
# "next_url_path": "pagination.next"
# }
# Header Link (GitHub-style):
# "paginator": {
# "type": "header_link"
# }
# Offset:
# "paginator": {
# "type": "offset",
# "limit": 100,
# "offset_param": "offset",
# "limit_param": "limit"
# }
# Page Number:
# "paginator": {
# "type": "page_number",
# "page_param": "page",
# "page_size": 100
# }
# Cursor:
# "paginator": {
# "type": "cursor",
# "cursor_path": "next_cursor",
# "cursor_param": "cursor"
# }
# Incremental Loading Example
# ---------------------------
# Add to endpoint configuration:
# "endpoint": {
# "path": "items",
# "params": {
# "updated_since": "{incremental.start_value}"
# }
# },
# "incremental": {
# "cursor_path": "updated_at",
# "initial_value": "2024-01-01T00:00:00Z"
# },
# "write_disposition": "merge",
# "primary_key": "id"
```
### scripts/install_packages.py
```python
#!/usr/bin/env python3
"""
Install dlt packages with automatic dependency manager detection.
This script detects the current dependency manager (uv, pip, poetry, pipenv)
and installs the required dlt packages based on the destination and features.
"""
import subprocess
import sys
from pathlib import Path
from typing import Optional
def detect_dependency_manager() -> Optional[str]:
"""
Detect the dependency manager used in the current project.
Returns:
The dependency manager name ('uv', 'poetry', 'pipenv', 'pip') or None
"""
# Check for uv
if Path("uv.lock").exists() or Path("pyproject.toml").exists():
try:
subprocess.run(["uv", "--version"], capture_output=True, check=True)
return "uv"
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Check for poetry
if Path("poetry.lock").exists() or (Path("pyproject.toml").exists() and "tool.poetry" in Path("pyproject.toml").read_text()):
try:
subprocess.run(["poetry", "--version"], capture_output=True, check=True)
return "poetry"
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Check for pipenv
if Path("Pipfile").exists():
try:
subprocess.run(["pipenv", "--version"], capture_output=True, check=True)
return "pipenv"
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Check for pip (always available in Python environments)
try:
subprocess.run([sys.executable, "-m", "pip", "--version"], capture_output=True, check=True)
return "pip"
except subprocess.CalledProcessError:
pass
return None
def ask_user_for_manager() -> str:
"""
Ask user which dependency manager to use.
Returns:
The chosen dependency manager name
"""
print("\nNo dependency manager detected. Which would you like to use?")
print("1. uv (recommended - fast, modern)")
print("2. pip (standard)")
print("3. poetry")
print("4. pipenv")
while True:
choice = input("\nEnter choice (1-4): ").strip()
if choice == "1":
return "uv"
elif choice == "2":
return "pip"
elif choice == "3":
return "poetry"
elif choice == "4":
return "pipenv"
else:
print("Invalid choice. Please enter 1-4.")
def ensure_uv_project_initialized() -> None:
"""
Ensure the project is initialized for uv by checking for pyproject.toml.
If not present, runs 'uv init' to create it.
"""
if not Path("pyproject.toml").exists():
print("No pyproject.toml found. Initializing project with 'uv init'...")
try:
subprocess.run(["uv", "init"], check=True)
print("✅ Project initialized with uv")
except subprocess.CalledProcessError as e:
print(f"❌ Failed to initialize project: {e}", file=sys.stderr)
sys.exit(1)
def install_packages(manager: str, packages: list[str]) -> None:
"""
Install packages using the specified dependency manager.
Args:
manager: The dependency manager to use
packages: List of package specifications to install
"""
print(f"\nInstalling packages using {manager}:")
print(f" {' '.join(packages)}")
if manager == "uv":
ensure_uv_project_initialized()
cmd = ["uv", "add"] + packages
elif manager == "pip":
cmd = [sys.executable, "-m", "pip", "install"] + packages
elif manager == "poetry":
cmd = ["poetry", "add"] + packages
elif manager == "pipenv":
cmd = ["pipenv", "install"] + packages
else:
raise ValueError(f"Unknown dependency manager: {manager}")
try:
subprocess.run(cmd, check=True)
print("✅ Packages installed successfully!")
except subprocess.CalledProcessError as e:
print(f"❌ Failed to install packages: {e}", file=sys.stderr)
sys.exit(1)
def get_required_packages(destination: Optional[str] = None, include_workspace: bool = True) -> list[str]:
"""
Get the list of packages to install based on destination and features.
Args:
destination: The dlt destination (e.g., 'bigquery', 'snowflake', 'duckdb')
include_workspace: Whether to include dlt[workspace] for dashboard support
Returns:
List of package specifications
"""
# Build combined extras list for a single dlt package
extras = []
# Add destination extra (duckdb is included by default, so skip it)
if destination and destination != "duckdb":
extras.append(destination)
# Add workspace support for dashboard/pipeline show command
if include_workspace:
extras.append("workspace")
# Return single package with combined extras, or base dlt if no extras
if extras:
return [f"dlt[{','.join(extras)}]"]
else:
return ["dlt"]
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(
description="Install dlt packages with automatic dependency manager detection"
)
parser.add_argument(
"--destination",
help="The dlt destination (e.g., bigquery, snowflake, duckdb, postgres)",
default=None
)
parser.add_argument(
"--no-workspace",
help="Don't install workspace support (dashboard/show command)",
action="store_true"
)
parser.add_argument(
"--manager",
help="Force a specific dependency manager (uv, pip, poetry, pipenv)",
choices=["uv", "pip", "poetry", "pipenv"],
default=None
)
args = parser.parse_args()
# Detect or ask for dependency manager
if args.manager:
manager = args.manager
print(f"Using specified dependency manager: {manager}")
else:
manager = detect_dependency_manager()
if manager:
print(f"Detected dependency manager: {manager}")
else:
manager = ask_user_for_manager()
# Get required packages
packages = get_required_packages(
destination=args.destination,
include_workspace=not args.no_workspace
)
# Install packages
install_packages(manager, packages)
if __name__ == "__main__":
main()
```
### scripts/open_dashboard.py
```python
#!/usr/bin/env python3
"""
Open dlt Pipeline Dashboard
This script opens the dlt pipeline dashboard for inspection and debugging.
It shows pipeline runs, schemas, data, and allows querying loaded data.
Usage:
python open_dashboard.py <pipeline_name>
python open_dashboard.py # Will prompt for pipeline name
"""
import sys
import subprocess
from pathlib import Path
def find_pipelines() -> list[str]:
"""
Find available pipeline names in the .dlt/pipelines directory.
Returns:
List of pipeline names
"""
pipelines_dir = Path(".dlt/pipelines")
if not pipelines_dir.exists():
return []
return [p.name for p in pipelines_dir.iterdir() if p.is_dir()]
def open_dashboard(pipeline_name: str) -> None:
"""
Open the dlt dashboard for the specified pipeline.
Args:
pipeline_name: Name of the pipeline to inspect
"""
try:
print(f"Opening dashboard for pipeline: {pipeline_name}")
subprocess.run(
["dlt", "pipeline", pipeline_name, "show"],
check=True
)
except subprocess.CalledProcessError as e:
print(f"Error opening dashboard: {e}", file=sys.stderr)
print("\nTroubleshooting:")
print("1. Ensure the pipeline has been run at least once")
print("2. Check that the pipeline name is correct")
print("3. Verify dlt is installed: pip install dlt")
sys.exit(1)
except FileNotFoundError:
print("Error: dlt command not found", file=sys.stderr)
print("Install dlt: pip install dlt")
sys.exit(1)
def main() -> None:
"""Main entry point."""
# Get pipeline name from command line or prompt
if len(sys.argv) > 1:
pipeline_name = sys.argv[1]
else:
# Try to find available pipelines
pipelines = find_pipelines()
if not pipelines:
print("No pipelines found in .dlt/pipelines/")
print("\nUsage: python open_dashboard.py <pipeline_name>")
sys.exit(1)
if len(pipelines) == 1:
pipeline_name = pipelines[0]
print(f"Found pipeline: {pipeline_name}")
else:
print("Available pipelines:")
for i, name in enumerate(pipelines, 1):
print(f" {i}. {name}")
try:
choice = int(input("\nSelect pipeline number: "))
pipeline_name = pipelines[choice - 1]
except (ValueError, IndexError, KeyboardInterrupt):
print("\nInvalid selection")
sys.exit(1)
# Open the dashboard
open_dashboard(pipeline_name)
if __name__ == "__main__":
main()
```