Orchestrate dbt with Airflow
dbt is an open-source library for analytics engineering that helps users build interdependent SQL models for in-warehouse data transformation. As ephemeral compute becomes more readily available in data warehouses thanks to tools like Snowflake, dbt has become a key component of the modern data engineering workflow. Now, data engineers can use dbt to write, organize, and run in-warehouse transformations of raw data.
Organizations can use Airflow to orchestrate and execute dbt models as DAGs. Running dbt with Airflow ensures a reliable, scalable environment for models, as well as the ability to trigger models only after every prerequisite task is met. Airflow also gives you fine-grained control over dbt tasks such that teams have observability over every step in their dbt models.
In this guide, you'll:
- Use the dbt Cloud Provider to orchestrate dbt Cloud with Airflow.
- Review two common use cases for orchestrating dbt Core with Airflow with the
BashOperator
: - Learn how to extend the model-level use case by automating changes to a dbt model.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of dbt. See Getting started with dbt Cloud.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow operators. See Operators 101.
- Airflow connections. See Managing your Connections in Apache Airflow.
dbt Cloud
To orchestrate dbt Cloud jobs with Airflow, you can use the dbt Cloud provider, which contains the following useful modules:
DbtCloudRunJobOperator
: Executes a dbt Cloud job.DbtCloudGetJobRunArtifactOperator
: Downloads artifacts from a dbt Cloud job run.DbtCloudJobRunSensor
: Waits for a dbt Cloud job run to complete.DbtCloudHook
: Interacts with dbt Cloud using the V2 API.
To use the dbt Cloud provider in your DAGs, you'll need to complete the following steps:
- Add the
apache-airflow-providers-dbt-cloud
package to your Airflow environment. If you are working in an Astro project, you can add the package to yourrequirements.txt
file. - Set up an Airflow connection to your dbt Cloud instance. The connection type should be
dbt Cloud
, and it should include an API token from your dbt Cloud account. If you want your dbt Cloud Provider tasks to use a default account ID, you can add that to the connection, but it is not required.
In the DAG below, you'll review a simple implementation of the dbt Cloud provider. This example showcases how to run a dbt Cloud job from Airflow, while adding an operational check to ensure the dbt Cloud job is not running prior to triggering. The DbtCloudHook
provides a list_job_runs()
method which can be used to retrieve all runs for a given job. The operational check uses this method to retrieve the latest triggered run for a job and check its status. If the job is currently not in a state of 10 (Success), 20 (Error), or 30 (Canceled), the pipeline will not try to trigger another run.
from pendulum import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunStatus
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.utils.edgemodifier import Label
DBT_CLOUD_CONN_ID = "dbt"
JOB_ID = "{{ var.value.dbt_cloud_job_id }}"
def _check_job_not_running(job_id):
"""
Retrieves the last run for a given dbt Cloud job and checks to see if the job is not currently running.
"""
hook = DbtCloudHook(DBT_CLOUD_CONN_ID)
runs = hook.list_job_runs(job_definition_id=job_id, order_by="-id")
latest_run = runs[0].json()["data"][0]
return DbtCloudJobRunStatus.is_terminal(latest_run["status"])
@dag(
start_date=datetime(2022, 2, 10),
schedule_interval="@daily",
catchup=False,
default_view="graph",
doc_md=__doc__,
)
def check_before_running_dbt_cloud_job():
begin, end = [EmptyOperator(task_id=id) for id in ["begin", "end"]]
check_job = ShortCircuitOperator(
task_id="check_job_is_not_running",
python_callable=_check_job_not_running,
op_kwargs={"job_id": JOB_ID},
)
trigger_job = DbtCloudRunJobOperator(
task_id="trigger_dbt_cloud_job",
dbt_cloud_conn_id=DBT_CLOUD_CONN_ID,
job_id=JOB_ID,
check_interval=600,
timeout=3600,
)
begin >> check_job >> Label("Job not currently running. Proceeding.") >> trigger_job >> end
dag = check_before_running_dbt_cloud_job()
In the DbtCloudRunJobOperator
you must provide the dbt connection ID as well as the job_id
of the job you are triggering.
The full code for this example, along with other DAGs that implement the dbt Cloud Provider, can be found on the Astronomer Registry.
dbt Core
When orchestrating dbt Core with Airflow, a straightforward DAG design is to run dbt commands directly through the BashOperator
. In this section you'll review two use cases that demonstrate how it's done.
Use case 1: dbt Core and Airflow at the project level
For this example you'll use the BashOperator
, which simply executes a shell command, because it lets us run specific dbt commands. The primary dbt interface is the command line, so the BashOperator
is one of the best tools for managing dbt. You can execute dbt run
or dbt test
directly in Airflow as you would with any other shell.
Note: The code for this example can be found on the Astronomer Registry.
The DAG below uses the BashOperator
to run a dbt project and the models' associated tests, each in a single Task:
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import datetime
from airflow.utils.dates import timedelta
with DAG(
dag_id='dbt_dag',
start_date=datetime(2021, 12, 23),
description='An Airflow DAG to invoke simple dbt commands',
schedule_interval=timedelta(days=1),
) as dag:
dbt_run = BashOperator(
task_id='dbt_run',
bash_command='dbt run'
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='dbt test'
)
dbt_run >> dbt_test
Using the BashOperator
to run dbt run
and dbt test
is a working solution for simple use cases or when you would rather have dbt manage dependencies between models. If you need something quick to develop and deploy that has the full power of dbt behind it, then this is the solution for you. However, running dbt at the project-level has several issues:
- Low observability into what execution state the project is in.
- Failures are absolute and require the whole
dbt
group of models to be run again, which can be costly.
Use case 2: dbt Core and Airflow at the model level
What if you need more visibility into the steps dbt is running in each task? Instead of running a group of dbt models on a single task, you can write a DAG that runs a task for each model. Using this method, our dbt workflow is more controllable because you can see the successes, failures, and retries of each dbt model in its corresponding Airflow task. If a model near the end of our dbt pipeline fails, you can fix the broken model and retry that individual task without having to rerun the entire workflow. Also, you no longer have to worry about defining Sensors to configure interdependency between Airflow DAGs because you've consolidated your work into a single DAG. Our friends at Updater came up with this solution.
To make this work, you need a file that's generated by dbt called manifest.json
. This file is generated in the target directory of your dbt
project and contains a full representation of your dbt project. For more information on this file, see the dbt documentation.
Our DAG will read the manifest.json
file, parse it, create the necessary BashOperator
Airflow tasks, and then set the dependencies to match those of your dbt project. The end result is that each model in your dbt project maps to two tasks in your Airflow DAG: one task to run the model, and another task to run the tests associated with that model. All of these models will run in the appropriate order thanks to the task dependencies you've set. You implement this workflow using the following DAG:
import datetime
import json
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import datetime
from airflow.utils.dates import timedelta
dag = DAG(
dag_id='dbt_dag',
start_date=datetime(2020, 12, 23),
description='A dbt wrapper for Airflow',
schedule_interval=timedelta(days=1),
)
def load_manifest():
local_filepath = "/usr/local/airflow/dags/dbt/target/manifest.json"
with open(local_filepath) as f:
data = json.load(f)
return data
def make_dbt_task(node, dbt_verb):
"""Returns an Airflow operator either run and test an individual model"""
DBT_DIR = "/usr/local/airflow/dags/dbt"
GLOBAL_CLI_FLAGS = "--no-write-json"
model = node.split(".")[-1]
if dbt_verb == "run":
dbt_task = BashOperator(
task_id=node,
bash_command=f"""
cd {DBT_DIR} &&
dbt {GLOBAL_CLI_FLAGS} {dbt_verb} --target prod --models {model}
""",
dag=dag,
)
elif dbt_verb == "test":
node_test = node.replace("model", "test")
dbt_task = BashOperator(
task_id=node_test,
bash_command=f"""
cd {DBT_DIR} &&
dbt {GLOBAL_CLI_FLAGS} {dbt_verb} --target prod --models {model}
""",
dag=dag,
)
return dbt_task
data = load_manifest()
dbt_tasks = {}
for node in data["nodes"].keys():
if node.split(".")[0] == "model":
node_test = node.replace("model", "test")
dbt_tasks[node] = make_dbt_task(node, "run")
dbt_tasks[node_test] = make_dbt_task(node, "test")
for node in data["nodes"].keys():
if node.split(".")[0] == "model":
# Set dependency to run tests on a model after model runs finishes
node_test = node.replace("model", "test")
dbt_tasks[node] >> dbt_tasks[node_test]
# Set all model -> model dependencies
for upstream_node in data["nodes"][node]["depends_on"]["nodes"]:
upstream_node_type = upstream_node.split(".")[0]
if upstream_node_type == "model":
dbt_tasks[upstream_node] >> dbt_tasks[node]
The code for this example can be found on the Astronomer Registry.
Now you have a solution that orchestrates a dbt project in detail, giving data engineers visibility into each dbt model without affecting each model's dependencies. In the next section, you'll see how to make this DAG resilient to change.
Bonus 1: Productionizing with CI/CD
For the team at Updater, splitting the dbt models into tasks was only the first step. The next questions they tackled are:
- How do you automatically update
manifest.json
so that you don't need to manually copy it, paste it into our Airflow project, and redeploy our Airflow environment every time our dbt project changes? - How do you extend our workflows to accommodate multiple DAGs and schedule intervals?
- How do you contextualize this approach in the case of a broader ELT pipeline?
Running our dbt models on a single DAG has some limitations. Specifically, this implementation cannot handle running different groups of dbt models on different schedules.
To add this functionality, you can take a group of models defined by some selector, such as dbt run --models tag:hourly
, and deploy that set of models as their own Airflow DAG with its own defined schedule. WYou can then use your manifest.json
file to set dependencies between these groups of models and build out a robust CI process. To do this, you can:
Use the
selectors.yml
file (introduced in dbt 0.18) to define a set of model selectors for each Airflow DAG schedule you want to create. You can then use dbt's tagging feature to tag every model with a desired schedule interval.Use a CI/CD provider to run a Python script that:
Runs
dbt compile
to create a fresh copy ofmanifest.json
Reads the model selectors defined in the YAML file
Uses the
dbt ls
command to list all of the models associated with each model selector in the YAML fileTurns the dbt DAG from
manifest.json
into aGraph
object with thenetworkx
libraryUses the methods available on the
Graph
object to figure out the correct set of dependencies for each group of models defined in the YAML fileWrites the dependencies for each group of models (stored as a list of tuples) as a pickle file to local storage.
Here is what that Python script looks like in practice:
import yaml
import os
import json
import networkx as nx
import pickle
# README
# This file is a utility script that is run using CircleCI in the deploy
# step. It is not run by Airflow in any way. The point of this script is
# to generate a pickle file that contains all of the dependencies between dbt models
# for each DAG (usually corresponding to a different schedule) that you want
# to run.
def load_manifest():
"""Load manifest.json """
local_filepath = f"{DBT_DIR}/target/manifest.json"
with open(local_filepath) as f:
data = json.load(f)
return data
def load_model_selectors():
"""Load the dbt selectors from YAML file to be used with dbt ls command"""
with open(f"{DBT_DIR}/selectors.yml") as f:
dag_model_selectors = yaml.full_load(f)
selected_models = {}
for selector in dag_model_selectors["selectors"]:
selector_name = selector["name"]
selector_def = selector["definition"]
selected_models[selector_name] = selector_def
return selected_models
def parse_model_selector(selector_def):
"""Run the dbt ls command which returns all dbt models associated with a particular
selection syntax"""
models = os.popen(f"cd {DBT_DIR} && dbt ls --models {selector_def}").read()
models = models.splitlines()
return models
def generate_all_model_dependencies(all_models, manifest_data):
"""Generate dependencies for entire project by creating a list of tuples that
represent the edges of the DAG"""
dependency_list = []
for node in all_models:
# Cleaning things up to match node format in manifest.json
split_node = node.split(".")
length_split_node = len(split_node)
node = split_node[0] + "." + split_node[length_split_node - 1]
node = "model." + node
node_test = node.replace("model", "test")
# Set dependency to run tests on a model after model runs finishes
dependency_list.append((node, node_test))
# Set all model -> model dependencies
for upstream_node in manifest_data["nodes"][node]["depends_on"]["nodes"]:
upstream_node_type = upstream_node.split(".")[0]
upstream_node_name = upstream_node.split(".")[2]
if upstream_node_type == "model":
dependency_list.append((upstream_node, node))
return dependency_list
def clean_selected_task_nodes(selected_models):
"""Clean up the naming of the "selected" nodes so they match the structure of what
is coming out of the generate_all_model_dependencies function. This function doesn't create
a list of dependencies between selected nodes (that happens in generate_dag_dependencies), rather
it's just cleaning up the naming of the nodes and outputting them as a list"""
selected_nodes = []
for node in selected_models:
# Cleaning things up to match node format in manifest.json
split_node = node.split(".")
length_split_node = len(split_node)
node = split_node[0] + "." + split_node[length_split_node - 1]
# Adding run model nodes
node = "model." + node
selected_nodes.append(node)
# Set test model nodes
node_test = node.replace("model", "test")
selected_nodes.append(node_test)
return selected_nodes
def generate_dag_dependencies(selected_nodes, all_model_dependencies):
"""Return dependencies as list of tuples for a given DAG (set of models)"""
G = nx.DiGraph()
G.add_edges_from(all_model_dependencies)
G_subset = G.copy()
for node in G:
if node not in selected_nodes:
G_subset.remove_node(node)
selected_dependencies = list(G_subset.edges())
return selected_dependencies
def run():
"""Gets a list of all models in the project and creates dependencies.
You want to load all the models first because the logic to properly set
dependencies between subsets of models is basically
removing nodes from the complete DAG. This logic can be found in the
generate_dag_dependencies function. The networkx graph object is smart
enough that if you remove nodes with remove_node method that the dependencies
of the remaining nodes are what you would expect.
"""
manifest_data = load_manifest()
all_models = parse_model_selector("updater_data_model")
all_model_dependencies = generate_all_model_dependencies(all_models, manifest_data)
# Load model selectors
dag_model_selectors = load_model_selectors()
for dag_name, selector in dag_model_selectors.items():
selected_models = parse_model_selector(selector)
selected_nodes = clean_selected_task_nodes(selected_models)
dag_dependencies = generate_dag_dependencies(selected_nodes, all_model_dependencies)
with open(f"{DBT_DIR}/dbt_dags/data/{dag_name}.pickle", "wb") as f:
pickle.dump(dag_dependencies, f)
# RUN IT
DBT_DIR = "./dags/dbt"
run()
Create an Airflow DAG file for each group of models. Each DAG reads the associated pickle file, creates the required dbt model run/test tasks, and sets dependencies between them as specified in the pickle file. One of those DAGs might look something like this:
with DAG(
dag_id="dbt_dag",
schedule_interval="@daily",
max_active_runs=1,
catchup=False,
start_date=datetime(2021, 1, 1)
) as dag:
# Load dependencies from configuration file
dag_def = load_dag_def_pickle(f"{DAG_NAME}.pickle")
# Returns a dictionary of bash operators corresponding to dbt models/tests
dbt_tasks = create_task_dict(dag_def)
# Set dependencies between tasks according to config file
for edge in dag_def:
dbt_tasks[edge[0]] >> dbt_tasks[edge[1]]
The functions in the DAG file above have been split out for simplicity, but the logic can be found in the dbt_advanced.py DAG.
Putting all of this together, you end up with multiple Airflow DAGs, each running on its own defined schedule, with a specified group of interdependent dbt models running as individual tasks. With this system, running a production dbt model in Airflow is simple: all you need to do is tag a model with the appropriate schedule interval, and it will automatically get picked up and executed by the corresponding Airflow DAG.
Ultimately, this gives us a robust, end-to-end solution that captures the ideal scheduling, execution, and observability experience for running dbt models with Airflow.
With that said, this implementation still has some limitations. For a more in-depth consideration of the benefits and downsides of each implementation, see Part 2 of our Airflow/dbt blog series.
Bonus 2: DbtDagParser utility
The sample code you provided in the previous section demonstrates how to loop through the manifest.json
file of your DAG to parse out individual models and map them to Airflow tasks. To simplify the DAG code when using this pattern, you can use a convenient utility method that takes care of the parsing. The DbtDagParser
utility, developed and explained by Sam Bail, works as follows:
- The parser takes the dbt project path containing the
dbt_project.yml
file, as well as the path to theprofiles.yml
file, as inputs. Note that this setup assumes that you have a single repo that contains both your dbt project and Airflow code. - By providing a "dbt_tag" parameter, you can select a subset of models to run. This means you can specify multiple DAGs for different subsets of the dbt models, for example to run them on different schedules, as described in Part 2 of our blog series.
- The utility returns a task group containing all
dbt run
tasks for the models in the specified dbt DAG, and optionally another task group for all test tasks.
When used as shown in the sample code below, the utility provides a shortcut to creating Airflow task groups for dbt models. Note that this code snippet only shows part of the DAG file; you can find the whole file in the demo repo.
with dag:
start_empty = EmptyOperator(task_id='start')
dbt_seed = BashOperator(
task_id='dbt_seed',
bash_command=f'dbt {DBT_GLOBAL_CLI_FLAGS} seed --profiles-dir {DBT_PROJECT_DIR} --project-dir {DBT_PROJECT_DIR}'
)
end_empty = EmptyOperator(task_id='end')
dag_parser = DbtDagParser(dag=dag,
dbt_global_cli_flags=DBT_GLOBAL_CLI_FLAGS,
dbt_project_dir=DBT_PROJECT_DIR,
dbt_profiles_dir=DBT_PROJECT_DIR,
dbt_target=DBT_TARGET
)
dbt_run_group = dag_parser.get_dbt_run_group()
dbt_test_group = dag_parser.get_dbt_test_group()
start_empty >> dbt_seed >> dbt_run_group >> dbt_test_group >> end_empty
Using the jaffleshop demo dbt project, the parser creates the following DAG including two task groups for the dbt_run
and dbt_test
tasks:
One important fact to note here is that the DbtDagParser
does not include a dbt compile
step that updates the manifest.json
file. Since the Airflow Scheduler parses the DAG file periodically, having a compile step as part of the DAG creation could incur some unnecessary load for the scheduler. Astronomer recommends adding a dbt compile
step either as part of a CI/CD pipeline, or as part of a pipeline run in production before the Airflow DAG is run.
With regards to the dbt test
runs:
- The test runs are optional. You can simply skip the tests by not using
getdbttest_group()
. - The
dbt test
task group depends entirely on thedbt run
group. In this example, the DAG will run all models first, then all tests.
Conclusion
To recap, in this guide you have learned about dbt Cloud and dbt Core, how to create and productionize dbt tasks in Airflow, and how to automatically create dbt Core tasks based on a manifest. For a more detailed discussion on trade-offs, limitations, and adding dbt Core or dbt Cloud to a full ELT pipeline, see our blog posts. To see more examples of how to use dbt and Airflow to build pipelines, check out our dbt DAGs on the Registry.