tracing-upstream-lineage
$
npx mdskill add astronomer/agents/tracing-upstream-lineageTrace data origins by linking tables to their source DAGs.
- Resolves questions about where specific tables or columns originate.
- Integrates with Airflow DAGs, tasks, and Astro UI lineage tools.
- Searches DAG definitions and task logs to map data flows.
- Outputs clear lineage paths connecting datasets to their sources.
SKILL.md
.github/skills/tracing-upstream-lineageView on GitHub ↗
---
name: tracing-upstream-lineage
description: Trace upstream data lineage. Use when the user asks where data comes from, what feeds a table, upstream dependencies, data sources, or needs to understand data origins.
---
# Upstream Lineage: Sources
Trace the origins of data - answer "Where does this data come from?"
## Lineage Investigation
### Step 1: Identify the Target Type
Determine what we're tracing:
- **Table**: Trace what populates this table
- **Column**: Trace where this specific column comes from
- **DAG**: Trace what data sources this DAG reads from
### Step 2: Find the Producing DAG
Tables are typically populated by Airflow DAGs. Find the connection:
1. **Search DAGs by name**: Use `af dags list` and look for DAG names matching the table name
- `load_customers` -> `customers` table
- `etl_daily_orders` -> `orders` table
2. **Explore DAG source code**: Use `af dags source <dag_id>` to read the DAG definition
- Look for INSERT, MERGE, CREATE TABLE statements
- Find the target table in the code
3. **Check DAG tasks**: Use `af tasks list <dag_id>` to see what operations the DAG performs
### On Astro
If you're running on Astro, the **Lineage tab** in the Astro UI provides visual lineage exploration across DAGs and datasets. Use it to quickly trace upstream dependencies without manually searching DAG source code.
### On OSS Airflow
Use DAG source code and task logs to trace lineage (no built-in cross-DAG UI).
### Step 3: Trace Data Sources
From the DAG code, identify source tables and systems:
**SQL Sources** (look for FROM clauses):
```python
# In DAG code:
SELECT * FROM source_schema.source_table # <- This is an upstream source
```
**External Sources** (look for connection references):
- `S3Operator` -> S3 bucket source
- `PostgresOperator` -> Postgres database source
- `SalesforceOperator` -> Salesforce API source
- `HttpOperator` -> REST API source
**File Sources**:
- CSV/Parquet files in object storage
- SFTP drops
- Local file paths
### Step 4: Build the Lineage Chain
Recursively trace each source:
```
TARGET: analytics.orders_daily
^
+-- DAG: etl_daily_orders
^
+-- SOURCE: raw.orders (table)
| ^
| +-- DAG: ingest_orders
| ^
| +-- SOURCE: Salesforce API (external)
|
+-- SOURCE: dim.customers (table)
^
+-- DAG: load_customers
^
+-- SOURCE: PostgreSQL (external DB)
```
### Step 5: Check Source Health
For each upstream source:
- **Tables**: Check freshness with the **checking-freshness** skill
- **DAGs**: Check recent run status with `af dags stats`
- **External systems**: Note connection info from DAG code
## Lineage for Columns
When tracing a specific column:
1. Find the column in the target table schema
2. Search DAG source code for references to that column name
3. Trace through transformations:
- Direct mappings: `source.col AS target_col`
- Transformations: `COALESCE(a.col, b.col) AS target_col`
- Aggregations: `SUM(detail.amount) AS total_amount`
## Output: Lineage Report
### Summary
One-line answer: "This table is populated by DAG X from sources Y and Z"
### Lineage Diagram
```
[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
| |
DAG: ingest_sfdc DAG: transform_sales
```
### Source Details
| Source | Type | Connection | Freshness | Owner |
|--------|------|------------|-----------|-------|
| raw.orders | Table | Internal | 2h ago | data-team |
| Salesforce | API | salesforce_conn | Real-time | sales-ops |
### Transformation Chain
Describe how data flows and transforms:
1. Raw data lands in `raw.orders` via Salesforce API sync
2. DAG `transform_orders` cleans and dedupes into `stg.orders`
3. DAG `build_order_facts` joins with dimensions into `fct.orders`
### Data Quality Implications
- Single points of failure?
- Stale upstream sources?
- Complex transformation chains that could break?
### Related Skills
- Check source freshness: **checking-freshness** skill
- Debug source DAG: **debugging-dags** skill
- Trace downstream impacts: **tracing-downstream-lineage** skill
- Add manual lineage annotations: **annotating-task-lineage** skill
- Build custom lineage extractors: **creating-openlineage-extractors** skill
More from astronomer/agents
- airflowQueries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.
- airflow-adapterAirflow adapter pattern for v2/v3 API compatibility. Use when working with adapters, version detection, or adding new API methods that need to work across Airflow 2.x and 3.x.
- airflow-hitlUse when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).
- airflow-pluginsBuild Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.
- analyzing-dataQueries data warehouse and answers business questions about data. Handles questions requiring database/warehouse queries including "who uses X", "how many Y", "show me Z", "find customers", "what is the count", data lookups, metrics, trends, or SQL analysis.
- annotating-task-lineageAnnotate Airflow tasks with data lineage using inlets and outlets. Use when the user wants to add lineage metadata to tasks, specify input/output datasets, or enable lineage tracking for operators without built-in OpenLineage extraction.
- authoring-dagsWorkflow and best practices for writing Apache Airflow DAGs. Use when the user wants to create a new DAG, write pipeline code, or asks about DAG patterns and conventions. For testing and debugging DAGs, see the testing-dags skill.
- checking-freshnessQuick data freshness check. Use when the user asks if data is up to date, when a table was last updated, if data is stale, or needs to verify data currency before using it.
- cosmos-dbt-coreUse when turning a dbt Core project into an Airflow DAG/TaskGroup using Astronomer Cosmos. Does not cover dbt Fusion. Before implementing, verify dbt engine, warehouse, Airflow version, execution environment, DAG vs TaskGroup, and manifest availability.
- cosmos-dbt-fusionUse when running a dbt Fusion project with Astronomer Cosmos. Covers Cosmos 1.11+ configuration for Fusion on Snowflake/Databricks with ExecutionMode.LOCAL. Before implementing, verify dbt engine is Fusion (not Core), warehouse is supported, and local execution is acceptable. Does not cover dbt Core.