Skip to main content

Airflow data quality checks with SQL Operators

Data quality is key to the success of an organization's data systems. With in-DAG quality checks, you can halt pipelines and alert stakeholders before bad data makes its way to a production lake or warehouse.

Executing SQL queries is one of the most common use cases for data pipelines, and it's a simple and effective way to implement data quality checks. Using Airflow, you can quickly put together a pipeline specifically for checking data quality, or you can add quality checks to existing ETL/ELT pipelines with just a few lines of boilerplate code.

In this tutorial, you'll learn about three SQL Check operators and how to use them to build a robust data quality suite for your DAGs.

All code used in this tutorial is located in the Astronomer Registry.

Assumed knowledge

To get the most out of this tutorial, you should have an understanding of:

SQL Check operators

The SQL Check operators are versions of the SQLOperator that abstract SQL queries to streamline data quality checks. One difference between the SQL Check operators and the standard BaseSQLOperator is that the SQL Check operators respond with a boolean, meaning the task fails when any of the resulting queries fail. This is particularly helpful in stopping a data pipeline before bad data makes it to a given destination. The lines of code and values that fail the check are observable in the Airflow logs.

The following SQL Check operators are recommended for implementing data quality checks:

  • SQLColumnCheckOperator: Runs multiple predefined data quality checks on multiple columns within the same task.
  • SQLTableCheckOperator: Runs multiple user-defined checks on one or more columns of a table.
  • SQLCheckOperator: Takes any SQL query and returns a single row that is evaluated to booleans. This operator is useful for more complicated checks that could span several tables of your database.
  • SQLIntervalCheckOperator: Checks current data against historical data.

Additionally, two older SQL Check operators exist that can run one check at a time against a defined value or threshold:

  • SQLValueCheckOperator: A simpler operator that can be used when a specific, known value is being checked either as an exact value or within a percentage threshold.
  • SQLThresholdCheckOperator: An operator with flexible upper and lower thresholds, where the threshold bounds may also be described as SQL queries that return a numeric value.

Astronomer recommends using the SQLColumnCheckOperator and SQLTableCheckOperator over the SQLValueCheckOperator and SQLThresholdCheckOperator whenever possible to improve code readability.

Requirements

The SQLColumnCheckOperator and the SQLTableCheckOperator are available in the common SQL provider package which can be installed with:

pip install apache-airflow-providers-common-sql

The SQLCheckOperator, SQLIntervalCheckOperator, SQLValueCheckOperator and SQLThresholdCheckOperator are built into core Airflow and do not require a separate package installation.

Database connection

The SQL Check operators work with any database that can be queried using SQL. You have to define your connection in the Airflow UI and then pass the connection id to the operator's conn_id parameter.

Currently the operators cannot support BigQuery job_ids.

The target table can be specified as a string using the table parameter for the SQLColumnCheckOperator and SQLTableCheckOperator. When using the SQLCheckOperator, you can override the database defined in your Airflow connection by passing a different value to the database argument. The target table for the SQLCheckOperator has to be specified in the SQL statement.

Example SQLColumnCheckOperator

The SQLColumnCheckOperator has a column_mapping parameter which stores a dictionary of checks. Using this dictionary, it can run many checks within one task and still provide observability in the Airflow logs over which checks passed and which failed.

This check is useful for:

  • Ensuring all numeric values in a column are above a minimum, below a maximum or within a certain range (with or without a tolerance threshold).
  • Null checks.
  • Checking primary key columns for uniqueness.
  • Checking the number of distinct values of a column.

In the example below, 5 checks are performed on 3 different columns using the SQLColumnCheckOperator:

  • "MY_DATE_COL" is checked to ensure that it contains only unique dates.
  • "MY_TEXT_COL" is checked to ensure it has at least 10 distinct values and no NULL values.
  • "MY_NUM_COL" is checked to ensure it contains a minimum value of less than 10 and a maximum value of 100 with a 10% tolerance (maximum values between 90 and 110 are accepted).
check_columns = SQLColumnCheckOperator(
task_id="check_columns",
conn_id=example_conn,
table=example_table,
column_mapping={
"MY_DATE_COL": {
"unique_check": {"equal_to": 0}
},
"MY_TEXT_COL": {
"distinct_check": {"geq_to": 10},
"null_check": {"equal_to": 0}
},
"MY_NUM_COL": {
"min": {"less_than": 10},
"max": {"equal_to": 100, "tolerance": 0.1}
},
}
)

The SQLColumnCheckOperator offers 5 options for column checks which are abstractions over SQL statements:

  • "min": "MIN(column) AS column_min"
  • "max": "MAX(column) AS column_max"
  • "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check"
  • "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check"
  • "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check"

The resulting values can be compared to an expected value using any of the following qualifiers:

  • greater_than
  • geq_to (greater or equal than)
  • equal_to
  • leq_to (lesser or equal than)
  • less_than

You can add a tolerance to the comparisons in the form of a fraction (0.1 = 10% tolerance).

If the resulting boolean value is True the check passes, otherwise it fails. Airflow generates logs that show the set of returned records for every check that passes and the full query and result for checks that failed.

The following example shows the output of 2 successful checks that ran on the MY_NUM_COL column of a table in Snowflake using the SQLColumnCheckOperator. The checks concerned the minimum value and the maximum value in the column.

The logged line INFO - Record: (5, 101) lists the results of the query: the minimum value was 5 and the maximum value was 101, which satisfied the conditions of the check.

[2022-10-25, 06:12:55 UTC] {cursor.py:714} INFO - query: [SELECT MIN(MY_NUM_COL) AS MY_NUM_COL_min,MAX(MY_NUM_COL) AS MY_NUM_COL_max,SUM(C...]
[2022-10-25, 06:12:56 UTC] {cursor.py:738} INFO - query execution done
[2022-10-25, 06:12:56 UTC] {cursor.py:854} INFO - Number of results in first chunk: 1
[2022-10-25, 06:12:56 UTC] {connection.py:564} INFO - closed
[2022-10-25, 06:12:56 UTC] {connection.py:567} INFO - No async queries seem to be running, deleting session
[2022-10-25, 06:12:56 UTC] {sql.py:253} INFO - Record: (5, 101)

All checks that fail are listed at the end of the task log with their full SQL query and specific check that failed. In the following sample entry, the TASK_DURATION column failed the check. Instead of a minimum that is greater than or equal to 0, it had a minimum of -12.

[2022-07-18, 17:05:19 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py", line 126, in execute
raise AirflowException(
airflow.exceptions.AirflowException: Test failed
Query:
SELECT MIN(TASK_DURATION) AS TASK_DURATION_min,MAX(TASK_DURATION) AS TASK_DURATION_max,SUM(CASE WHEN TASK_DURATION IS NULL THEN 1 ELSE 0 END) AS TASK_DURATION_null_check FROM DB.SCHEMA.TABLE;
Results:
(-12, 1000, 0)
The following tests have failed:
Check: min,
Check Values: {'geq_to': 0, 'result': -12, 'success': False}

Example SQLTableCheckOperator

The SQLTableCheckOperator provides a way to check the validity of user defined SQL statements which can involve one or more columns of a table. There is no limit to the amount of columns these statements can involve or to their complexity. The statements are provided to the operator as a dictionary with the checks parameter.

The SQLTableCheckOperator is useful for:

  • Checks that include aggregate values using the whole table (e.g. comparing the average of one column to the average of another using the SQL AVG() function).
  • Row count checks.
  • Checking if a date is between certain bounds (for example, using MY_DATE_COL BETWEEN '2019-01-01' AND '2019-12-31' to make sure only dates in the year 2019 exist).
  • Comparisons between multiple columns, both aggregated and not aggregated.

In the example below, three checks are defined: my_row_count_check, my_column_sum_comparison_check and my_column_addition_check. The first check runs a SQL statement asserting that the table contains at least 1000 rows, the second check compares the sum of two columns, and the third check confirms that for each row MY_COL_1 + MY_COL_2 = MY_COL_3 is true.

table_checks = SQLTableCheckOperator(
task_id="table_checks",
conn_id=example_conn,
table=example_table,
checks={
"my_row_count_check": {
"check_statement": "COUNT(*) >= 1000"
},
"my_column_sum_comparison_check": {
"check_statement": "SUM(MY_COL_1) < SUM(MY_COL_2)"
},
"my_column_addition_check": {
"check_statement": "MY_COL_1 + MY_COL_2 = MY_COL_3"
}
}
)

The operator performs a CASE WHEN statement on each of the checks, assigning 1 to the checks that pass and 0 to the checks that fail. Afterwards, the operator looks for the minimum of these results and marks the task as failed if the minimum is 0. The SQLTableCheckOperator produces observable logs similar to those produced for SQLColumnCheckOperator.

Example SQLCheckOperator

The SQLCheckOperator returns a single row from a provided SQL query and checks to see if any of the returned values in that row are False. If any values are False, the task fails. This operator allows a great deal of flexibility in checking:

  • A specific, single column value.
  • Part of or an entire row compared to a known set of values.
  • Options for categorical variables and data types.
  • The results of any other function that can be written as a SQL query.

The following code snippet shows you how to use the operator in a DAG:

yellow_tripdata_row_quality_check = SQLCheckOperator(
conn_id=example_conn,
task_id="yellow_tripdata_row_quality_check",
sql="row_quality_yellow_tripdata_check.sql",
params={"pickup_datetime": "2021-01-01"},
)

The sql argument can be either a complete SQL query as a string or, as in this example, a reference to a query in a local file. In Astronomer projects, this is in the include/ directory. The params argument allows you to pass a dictionary of values to the SQL query, which can be accessed through the params keyword in the query. The conn_id argument points towards a previously defined Airflow connection to a database. The full code can be found in the data quality demo repository.

Because the SQLCheckOperator can process a wide variety of queries, it's important to use the right SQL query for the job. The following sample query (which is passed into the sql argument) was crafted for the specific use case of analyzing daily taxicab data. So, the values checked in each equation come from domain knowledge. Even the WHERE clause needs a data steward to know that both the vendor_id and pickup_datetime are needed to return a unique row.

The query used in the sql argument is:

SELECT vendor_id, pickup_datetime,
CASE
WHEN dropoff_datetime > pickup_datetime THEN 1
ELSE 0
END AS date_check,
CASE
WHEN passenger_count >= 0
THEN 1 ELSE 0
END AS passenger_count_check,
CASE
WHEN trip_distance >= 0 AND trip_distance <= 100
THEN 1 ELSE 0
END AS trip_distance_check,
CASE
WHEN ROUND((fare_amount + extra + mta_tax + tip_amount + \
improvement_surcharge + COALESCE(congestion_surcharge, 0)), 1) = \
ROUND(total_amount, 1) THEN 1
WHEN ROUND(fare_amount + extra + mta_tax + tip_amount + \
improvement_surcharge, 1) = ROUND(total_amount, 1) THEN 1
ELSE 0
END AS fare_check
FROM yellow_tripdata
WHERE pickup_datetime IN (SELECT pickup_datetime
FROM yellow_tripdata
ORDER BY RANDOM()
LIMIT 1)

If you want to use a specific date to quality check a row instead of using a random pickup_datetime, you can use the params passed into the operator. For example:

WHERE pickup_datetime = '{{ params.pickup_datetime }}'

By using CASE statements in the SQL query, you can check very specific cases of data quality that should always be true for this use case:

  • Drop-offs always occur after pickups.
  • A trip is only valid if there is at least one passenger.
  • A trip needs to be in a range allowed by the taxi company (in this case, it is assumed there is a maximum allowed trip distance of 100 miles).
  • Each of the components of the total fare should add up to the total fare.

Using a for loop, tasks are generated to run this check on every row or other subset of the data. In the SQL above, a pickup_datetime is chosen randomly, and the corresponding code uses a loop to spot-check ten rows. In the example DAG below, you can see how the loop results in TaskGroups that can be collapsed or expanded in the Airflow UI:

An example DAG showing data quality checks as part of a pipeline.

In the previous example DAG, you learned how your data quality checks fit into a pipeline. By loading the data into Redshift then performing checks as queries, you are offloading compute resources from Airflow to Redshift, which frees up Airflow to act only as an orchestrator.

For a production pipeline, data could first be loaded from S3 to a temporary staging table, then have its quality checks completed. If the quality checks succeed, another SQLOperator can load the data from staging to a production table. If the data quality checks fail, the pipeline can be stopped, and the staging table can be either used to help diagnose the data issue or scrapped to save resources. To see the complete example DAG and run it for yourself, see the data quality demo repository.

Conclusion

After reading this guide, you should feel comfortable using the SQL Check operators, understand how each one works, and get a sense of when each one would be useful. With these operators you have the foundation for a robust data quality suite right in your pipelines. If you are looking for more examples, or want to see how to use backend-specific operators like Redshift, BigQuery, or Snowflake, see the data quality demo repository.