Custom hooks and operators
One of the great benefits of Airflow is its vast network of provider packages that provide hooks, operators, and sensors for many common use cases. Another great benefit of Airflow is that it is highly customizable because everything is defined in Python code. If a hook, operator, or sensor you need doesn't exist in the open source, you can easily define your own.
In this guide, you'll learn how to define your own custom code. Then, you'll make your custom code available to your DAGs. You'll be using custom operators in your primary example, but the same concepts are applicable to custom hooks and sensors.
Assumed knowledge
To get the most out of this guide, you should have an understanding of:
- Airflow operators. See Operators 101.
- Airflow hooks. See Hooks 101.
- Managing Airflow project structure. See Managing Airflow code.
Define a custom operator
At a high level, creating a custom operator is straightforward. At a minimum, all custom operators must:
- Inherit from the
BaseOperator
. - Define
Constructor
andExecute
classes.
The code appears similar to the following:
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from hooks.my_hook import MyHook
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
my_field,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.my_field = my_field
def execute(self, context):
hook = MyHook('my_conn')
hook.my_method()
If your custom operator is modifying functionality of an existing operator, your class can inherit from the operator you are building on instead of the BaseOperator
. For more detailed instructions on defining custom operators, see the Apache Airflow How-to Guide.
Import custom operators
After you've defined your custom operator, you need to make it available to your DAGs. Some legacy Airflow documentation or forums may reference registering your custom operator as an Airflow plugin, but this is not necessary. To import a custom operator into your DAGs, the file containing your custom operator needs to be in a directory that is present in your PYTHONPATH
.
By default, Airflow adds the dags/
and plugins/
directories in a project to the PYTHONPATH
, so those are the most natural choices for storing custom operator files (check out the Apache Airflow Module Management docs for more info). Your project structure may vary depending on your team and your use case. Astronomer uses the following structure, where custom operator files live in the plugins/
directory with sub-folders for readability.
.
├── dags/
│ ├── example-dag.py
├── Dockerfile
├── include/
│ └── sql/
│ └── transforms.sql
├── packages.txt
├── plugins/
│ └── operators/
│ └── my_operator.py
│ └── sensors/
│ └── my_sensor.py
└── requirements.txt
For more details on why Astronomer recommends this project structure, see the Managing Airflow Code guide.
If you use an IDE and don't want to see import errors, add the plugins
directory as a source root.
After you've added your custom operators to the project, you can import them into your DAG like you would any other Python package:
from airflow import DAG
from datetime import datetime, timedelta
from operators.my_operator import MyOperator
from sensors.my_sensor import MySensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('example_dag',
max_active_runs=3,
schedule_interval='@once',
default_args=default_args) as dag:
sens = MySensor(
task_id='taskA'
)
op = MyOperator(
task_id='taskB',
my_field='some text'
)
sens >> op
And that's it! Your DAG will use MyOperator
and MySensor
when it runs, giving you full flexibility over what happens in your pipelines.