cosmos-dbt-fusion
$
npx mdskill add astronomer/agents/cosmos-dbt-fusionConfigure dbt Fusion projects on Snowflake or Databricks with Cosmos.
- Sets up local execution for Fusion models on supported cloud warehouses.
- Requires Cosmos 1.11+ and verifies dbt engine is Fusion before starting.
- Enforces binary execution constraints by disabling async and virtualenv.
- Validates warehouse compatibility against Snowflake, Databricks, Bigquery, and Redshift.
SKILL.md
.github/skills/cosmos-dbt-fusionView on GitHub ↗
---
name: cosmos-dbt-fusion
description: Use when running a dbt Fusion project with Astronomer Cosmos. Covers Cosmos 1.11+ configuration for Fusion on Snowflake/Databricks with ExecutionMode.LOCAL. Before implementing, verify dbt engine is Fusion (not Core), warehouse is supported, and local execution is acceptable. Does not cover dbt Core.
---
# Cosmos + dbt Fusion: Implementation Checklist
Execute steps in order. This skill covers Fusion-specific constraints only.
> **Version note**: dbt Fusion support was introduced in Cosmos 1.11.0. Requires Cosmos ≥1.11.
>
> **Reference**: See **[reference/cosmos-config.md](reference/cosmos-config.md)** for ProfileConfig, operator_args, and Airflow 3 compatibility details.
> **Before starting**, confirm: (1) dbt engine = Fusion (not Core → use **cosmos-dbt-core**), (2) warehouse = Snowflake, Databricks, Bigquery and Redshift only.
### Fusion-Specific Constraints
| Constraint | Details |
|------------|---------|
| No async | `AIRFLOW_ASYNC` not supported |
| No virtualenv | Fusion is a binary, not a Python package |
| Warehouse support | Snowflake, Databricks, Bigquery and Redshift support [while in preview](https://github.com/dbt-labs/dbt-fusion) |
---
## 1. Confirm Cosmos Version
> **CRITICAL**: Cosmos 1.11.0 introduced dbt Fusion compatibility.
```bash
# Check installed version
pip show astronomer-cosmos
# Install/upgrade if needed
pip install "astronomer-cosmos>=1.11.0"
```
**Validate**: `pip show astronomer-cosmos` reports version ≥ 1.11.0
---
## 2. Install the dbt Fusion Binary (REQUIRED)
dbt Fusion is NOT bundled with Cosmos or dbt Core. Install it into the Airflow runtime/image.
Determine where to install the Fusion binary (Dockerfile / base image / runtime).
### Example Dockerfile Install
```dockerfile
USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astro
```
### Common Install Paths
| Environment | Typical path |
|-------------|--------------|
| Astro Runtime | `/home/astro/.local/bin/dbt` |
| System-wide | `/usr/local/bin/dbt` |
**Validate**: The `dbt` binary exists at the chosen path and `dbt --version` succeeds.
---
## 3. Choose Parsing Strategy (RenderConfig)
Parsing strategy is the same as dbt Core. Pick ONE:
| Load mode | When to use | Required inputs |
|-----------|-------------|-----------------|
| `dbt_manifest` | Large projects; fastest parsing | `ProjectConfig.manifest_path` |
| `dbt_ls` | Complex selectors; need dbt-native selection | Fusion binary accessible to scheduler |
| `automatic` | Simple setups; let Cosmos pick | (none) |
```python
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.AUTOMATIC, # or DBT_MANIFEST, DBT_LS
)
```
---
## 4. Configure Warehouse Connection (ProfileConfig)
> **Reference**: See **[reference/cosmos-config.md](reference/cosmos-config.md#profileconfig-warehouse-connection)** for full ProfileConfig options and examples.
```python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
```
---
## 5. Configure ExecutionConfig (LOCAL Only)
> **CRITICAL**: dbt Fusion with Cosmos requires `ExecutionMode.LOCAL` with `dbt_executable_path` pointing to the Fusion binary.
```python
from cosmos import ExecutionConfig
from cosmos.constants import InvocationMode
_execution_config = ExecutionConfig(
invocation_mode=InvocationMode.SUBPROCESS,
dbt_executable_path="/home/astro/.local/bin/dbt", # REQUIRED: path to Fusion binary
# execution_mode is LOCAL by default - do not change
)
```
---
## 6. Configure Project (ProjectConfig)
```python
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # for dbt_manifest load mode
# install_dbt_deps=False, # if deps precomputed in CI
)
```
---
## 7. Assemble DAG / TaskGroup
### Option A: DbtDag (Standalone)
```python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig(
dbt_executable_path="/home/astro/.local/bin/dbt", # Fusion binary
)
_render_config = RenderConfig()
my_fusion_dag = DbtDag(
dag_id="my_fusion_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
start_date=datetime(2025, 1, 1),
schedule="@daily",
)
```
### Option B: DbtTaskGroup (Inside Existing DAG)
```python
from airflow.sdk import dag, task # Airflow 3.x
# from airflow.decorators import dag, task # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_fusion_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())
my_dag()
```
---
## 8. Final Validation
Before finalizing, verify:
- [ ] **Cosmos version**: ≥1.11.0
- [ ] **Fusion binary installed**: Path exists and is executable
- [ ] **Warehouse supported**: Snowflake, Databricks, Bigquery or Redshift only
- [ ] **Secrets handling**: Airflow connections or env vars, NOT plaintext
### Troubleshooting
If user reports dbt Core regressions after enabling Fusion:
```bash
AIRFLOW__COSMOS__PRE_DBT_FUSION=1
```
### User Must Test
- [ ] The DAG parses in the Airflow UI (no import/parse-time errors)
- [ ] A manual run succeeds against the target warehouse (at least one model)
---
## Reference
- Cosmos dbt Fusion docs: https://astronomer.github.io/astronomer-cosmos/configuration/dbt-fusion.html
- dbt Fusion install: https://docs.getdbt.com/docs/core/pip-install#dbt-fusion
---
## Related Skills
- **cosmos-dbt-core**: For dbt Core projects (not Fusion)
- **authoring-dags**: General DAG authoring patterns
- **testing-dags**: Testing DAGs after creation
More from astronomer/agents
- airflowQueries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.
- airflow-adapterAirflow adapter pattern for v2/v3 API compatibility. Use when working with adapters, version detection, or adding new API methods that need to work across Airflow 2.x and 3.x.
- airflow-hitlUse when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).
- airflow-pluginsBuild Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.
- analyzing-dataQueries data warehouse and answers business questions about data. Handles questions requiring database/warehouse queries including "who uses X", "how many Y", "show me Z", "find customers", "what is the count", data lookups, metrics, trends, or SQL analysis.
- annotating-task-lineageAnnotate Airflow tasks with data lineage using inlets and outlets. Use when the user wants to add lineage metadata to tasks, specify input/output datasets, or enable lineage tracking for operators without built-in OpenLineage extraction.
- authoring-dagsWorkflow and best practices for writing Apache Airflow DAGs. Use when the user wants to create a new DAG, write pipeline code, or asks about DAG patterns and conventions. For testing and debugging DAGs, see the testing-dags skill.
- checking-freshnessQuick data freshness check. Use when the user asks if data is up to date, when a table was last updated, if data is stale, or needs to verify data currency before using it.
- cosmos-dbt-coreUse when turning a dbt Core project into an Airflow DAG/TaskGroup using Astronomer Cosmos. Does not cover dbt Fusion. Before implementing, verify dbt engine, warehouse, Airflow version, execution environment, DAG vs TaskGroup, and manifest availability.
- creating-openlineage-extractorsCreate custom OpenLineage extractors for Airflow operators. Use when the user needs lineage from unsupported or third-party operators, wants column-level lineage, or needs complex extraction logic beyond what inlets/outlets provide.