Recently, I’ve been trying to coordinate two Airflow DAGs such that one would only run – on its own hourly schedule – if the other DAG (running on a daily basis) has been successful.
In today’s tutorial I will walk you through the use case and demonstrate how to achieve the desired behaviour in three different ways; two using the ExternalTaskSensor
and another one using a customised approach with PythonOperator
.
Use Case: Running the hourly DAG only if the daily DAG succeeded
Now let’s get started with our use case that involves two Airflow DAGs.
The first DAG, my_daily_dag
, runs every day at 5AM UTC.
from datetime import datetime, timedelta
from pathlib import Path
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 5 * * *',
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')
The second DAG, my_hourly_dag
, runs on an hourly basis, between 6AM and 8PM UTC.
from datetime import datetime, timedelta
from pathlib import Path
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 6-20 * * *', # At :00 every hour between 6AM-8PM
max_active_runs=1,
) as dag:
DummyOperator(task_id='dummy_task')
In our use case, we would like my_hourly_dag
to run only if my_daily_dag
has ran successfully within the current date. If not, then my_hourly_dag
should be skipped. It is important to mention here that we don’t want to trigger my_hourly_dag
as soon as my_daily_dag
succeeds. That would be achievable with TriggerDagRun
operator. Instead, we want both DAGs to run on their own schedule but add a condition on the my_hourly_dag
.
In the next two sections we will discuss and demonstrate how to achieve this using a few different approaches.
Determining the execution_date of both DAGs
Before jumping into implementation details, it is important to first understand how the two DAGs differ in terms of their respective execution_date
. This is crucial since we will use this knowledge to determine the implementation of the desired behaviour.
Let’s assume that today is December 13th. The daily DAG my_daily_dag
, has an execution_date
of 2023–12–12 00:00
since it covers the data interval between 2023–12–12
and 2023–12–13
. Recall that Airflow DAG runs start at the end of an interval.
Meanwhile, our hourly my_hourly_dag
DAG has an execution_date
of 2023–12–13
(except the midnight run that will have an execution_date
of 2023–12–12
since the beginning of the interval is 2023–12–12 23:00
through 2023–12–13 00:00
).
Using ExternalTaskSensor
Our first option is the built-in [ExternalTaskSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor)
operator.
Waits for a different DAG, task group, or task to complete for a specific logical date.
By default, the
ExternalTaskSensor
will wait for the external task to succeed, at which point it will also succeed. However, by default it will not fail if the external task fails, but will continue to check the status until the sensor times out (thus giving you time to retry the external task without also having to clear the sensor).
We can use this sensor in our my_hourly_dag
that will essentially check if my_daily_dag
has been successful in the specified interval.
The ExternalTaskSensor
accepts one of execution_delta
or execution_date_fn
. The former can be used to indicate the time difference with the previous execution to look at. By default, this is set to the logical date as the current task/DAG. The latter, receives a callable (i.e. a function) that accepts the current execution’s logical date as the first position argument and returns the desired logical date(s) to query.
– execution_delta (
[datetime.timedelta](https://docs.python.org/3/library/datetime.html#datetime.timedelta)
|None
) – time difference with the previous execution to look at, the default is the same logical date as the current task or DAG. For yesterday, use [positive!]datetime.timedelta(days=1)
. Eitherexecution_delta
orexecution_date_fn
can be passed toExternalTaskSensor
, but not both.– execution_date_fn (
Callable
|None
) – function that receives the current execution’s logical date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired logical dates to query. Eitherexecution_delta
orexecution_date_fn
can be passed toExternalTaskSensor
, but not both.
Since the two DAGs are running on a different schedule, the default behaviour of the sensor won’t work for us . In the previous section, we clarified why the two DAGs will have different execution dates.
Therefore, we need to figure out how to use either the execution_delta
or execution_date_fn
to make both execution dates align with each other.
Using ExternalTaskSensor with execution_delta
The simplest approach in my opinion is to use execution_delta
. The data interval start date of our daily DAG, is "yesterday at 5AM UTC". Since we know that my_hourly_dag
runs on an hourly basis, we can come up with a formula to compute the delta between interval start datetime of hourly DAG and the interval start datetime of the daily DAG.
The following will create a delta that adds up:
- 24 that corresponds to the difference of 24 hours the two DAGs have, given that they run on a different schedule, as explained earlier
- the difference between the hour of the interval start datetime of the hourly dag and 5, which is the hour the daily DAG runs every day.
24 + (hourly_dag_interval_start_hour - 5)
As an example, consider the following scenarios when the hourly DAG starts running at 6AM (until 8PM):
At 6AM:
- hourly data interval starts at 5AM (and ends at 6AM)
- daily data interval starts at 5AM yesterday
execution_delta=24 + (5-5) = 24
- The sensor will check the success of the daily DAG with data interval start date set to 24 hours before
At 7AM:
- hourly data intevral starts at 6AM (and ends at 7AM)
- daily data interval starts at 5AM yesterday
execution_delta=24 + (6-5) = 25
- The sensor will check the success of the daily DAG with data interval start date set to 25 hours before
and so on.
Now how do we implement this? One problem we need to face is that (by the time this post was written), execution_delta
is not a templated field meaning that we cannot use the templated variables that give us access to useful information, including the data_interval_start
.
Therefore, we will have to manually construct the data_interval_start
of the hourly DAG. Given that DAG runs every hour, the data interval start hour corresponds the current hour minus one
from datetime import datetime, timezone
datetime.now(timezone.utc).hour - 1
Therefore, the execution_delta
that will be provided as an argument to the ExternalTaskSensor
can now be defined as:
execution_delta=timedelta(hours=24 + datetime.now(timezone.utc).hour - 1 - 5)
Here’s the full code of our hourly DAG, that will run every hour between 6AM and 8PM UTC, only if the daily DAG has been successful today.
from datetime import datetime, timedelta, timezone
from pathlib import Path
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 6-20 * * *', # At :00 every hour between 6AM-8PM
max_active_runs=1,
) as dag:
sensor_task = ExternalTaskSensor(
task_id='daily_dag_completed_successfully',
external_dag_id='my_daily_dag',
soft_fail=True,
check_existence=True,
execution_delta=timedelta(hours=24 + datetime.now(timezone.utc).hour - 1 - 5),
poke_interval=30,
timeout=120,
)
dummy_task = DummyOperator(task_id='dummy_task')
sensor_task >> dummy_task
Using ExternalTaskSensor with execution_date_fn
Now apart from execution_delta
, the sensor can be configured to work with execution_date_fn
that accepts a callable returning the logical date(s) to be queried.
In other words, we need to create a function and fetch the desired logical date of the daily DAG that needs to be against the conditions of the sensor, that by default will check whether the state of the DagRun at the specified interval was successful.
The function below, will fetch the DagRuns of the daily DAG and return the execution date of the DagRun only if it happened on the same day as the hourly DAG. If no DagRun is found (which means that the daily DAG was not executed in the past, AirflowSkipException
will be raised such that the sensor task (and any downstream) is skipped. Likewise, if no DagRun for the daily DAG is found that happened on the same date as the hourly DAG, the current_logical_dt
will be returned, which is essentially the default value that is checked by ExternalTaskSensor
(and is the argument that must be present in the function definition that is provided when using execution_date_fn
argument).
Recall that the two DAGs run on a different schedule which means their execution_date
differs. In order to make a proper comparison and determine whether the daily DAG was executed successfully on the same day that the hourly DAG runs, we need to subtract one day from the hourly DAG’s execution date. Note that we are only interested whether the year, month and day between the two DAGs is the same (we don’t really care about the time information in this context).
import logging
from airflow.exceptions import AirflowSkipException
from airflow.models import DagRun
def get_most_recent_dag_run(current_logical_dt):
dag_id = 'my_daily_dag'
# Get the historical DagRuns of the daily DAG
dag_runs = DagRun.find(dag_id=dag_id)
# Sort DagRuns on descending order such that the first element
# in the list, corresponds to the latest DagRun of the daily DAG
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
# If the daily DAG was not executed ever before, simply raise an
# exception to skip.
if not dag_runs:
logging.info(f'No DAG runs found for {dag_id}. Skipping..')
raise AirflowSkipException
# Get the latest DagRun of the daily DAG
latest_daily_dag_run = dag_runs[0]
# Subtract one day from hourly's DAG current execution_date in order to
# align with the daily DAG's scedule
current_logical_dt_yesterday = current_logical_dt.subtract(hours=24)
# if year/month/day of daily's DAG execution_date and hourly's DAG execution_date
# (minus one day) are the same, it means the daily DAG was executed today.
# We therefore return the execution_date of the latest daily DagRun.
# It's state (i.e. if successful) will be handled by the sensor and the configuration
# we provide to it.
if (
current_logical_dt_yesterday.day == latest_daily_dag_run.execution_date.day
and current_logical_dt_yesterday.month == latest_daily_dag_run.execution_date.month
and current_logical_dt_yesterday.year == latest_daily_dag_run.execution_date.year
):
logging.info(f'DAG run was found for {dag_id} today.')
return latest_daily_dag_run.execution_date
# Alternatively, return the current execution_date of the hourly DAG
# This is the default value the sensor would otherwise use, and essentially
# it means that the sensor won't be triggered given that the intervals between
# the daily DAG and the sensor won't align.
return current_logical_dt
Here’s the full code for our hourly DAG using execution_function_fn
with ExternalTaskSensor
.
import logging
from datetime import datetime, timedelta
from pathlib import Path
from airflow.exceptions import AirflowSkipException
from airflow.models import DAG, DagRun
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor
def get_most_recent_dag_run(current_logical_dt):
dag_id = 'my_daily_dag'
# Get the historical DagRuns of the daily DAG
dag_runs = DagRun.find(dag_id=dag_id)
# Sort DagRuns on descending order such that the first element
# in the list, corresponds to the latest DagRun of the daily DAG
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
# If the daily DAG was not executed ever before, simply raise an
# exception to skip.
if not dag_runs:
logging.info(f'No DAG runs found for {dag_id}. Skipping..')
raise AirflowSkipException
# Get the latest DagRun of the daily DAG
latest_daily_dag_run = dag_runs[0]
# Subtract one day from hourly DAG's current execution_date in order to
# align with the daily DAG's scedule
current_logical_dt_yesterday = current_logical_dt.subtract(hours=24)
# if year/month/day of daily DAG's execution_date and hourly DAG's execution_date
# (minus one day) are the same, it means the daily DAG was executed today.
# We therefore return the execution_date of the latest daily DagRun.
# It's state (i.e. if successful) will be handled by the sensor and the configuration
# we provide to it.
if (
current_logical_dt_yesterday.day == latest_daily_dag_run.execution_date.day
and current_logical_dt_yesterday.month == latest_daily_dag_run.execution_date.month
and current_logical_dt_yesterday.year == latest_daily_dag_run.execution_date.year
):
logging.info(f'DAG run was found for {dag_id} today.')
return latest_daily_dag_run.execution_date
# Alternatively, return the current execution_date of the hourly DAG
# This is the default value the sensor would otherwise use, and essentially
# it means that the sensor won't be triggered given that the intervals between
# the daily DAG and the sensor won't align.
return current_logical_dt
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 6-20 * * *', # At :00 every hour between 6AM-8PM
max_active_runs=1,
) as dag:
sensor_task = ExternalTaskSensor(
task_id='daily_dag_completed_successfully',
external_dag_id='my_daily_dag',
soft_fail=True,
check_existence=True,
execution_function_fn=get_most_recent_dag_run,
poke_interval=30,
timeout=120,
)
dummy_task = DummyOperator(task_id='dummy_task')
sensor_task >> dummy_task
Using PythonOperator
The second approach involves a more customised solution. More specifically, we can programmatically find the latest successful DagRun
of our daily DAG and handle the behaviour of the operator accordingly. In other words, if the latest successful DagRun
of the daily DAG does not align with the execution date of our hourly DAG, the task will be skipped (as well as the downstream tasks).
Therefore, we can write a function – similar to the one we have written in the previous section and was used as an argument to the execution_date_fn
argument for ExternalTaskSensor
.
More specifically, we need to fetch the DagRuns of the daily DAG, determine if anyone has completed successfully today (i.e. on the same day the hourly DAG runs). If none is found, we raise a AirflowSkipException
such that the execution of the hourly DAG is skipped. In this case, the PythonOperator
supports templated variables and we will therefore take advantage of it.
This is what our function looks like:
from airflow.exceptions import AirflowSkipException
from airflow.models import DagRun
from airflow.utils.state import DagRunState
def check_daily_dag_success_today(**kwargs):
dag_id = 'my_daily_dag'
# Get the historical DagRuns of the daily DAG
dag_runs = DagRun.find(dag_id=dag_id)
# Sort DagRuns on descending order such that the first element
# in the list, corresponds to the latest DagRun of the daily DAG
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
# If the daily DAG was not executed ever before, simply raise an
# exception to skip.
if not dag_runs:
logging.info(f'No DAG runs found for {dag_id}. Skipping..')
raise AirflowSkipException
# Get the latest DagRun of the daily DAG
latest_daily_dag_run = dag_runs[0]
# Subtract one day from hourly DAG's current execution_date in order to
# align with the daily DAG's schedule
data_interval_start = kwargs['data_interval_start']
data_interval_start_yesterday = data_interval_start.subtract(hours=24)
# Check the intervals and the success of the daily DAg's DagRun. If conditions are not met,
# DAG run should be skipped.
if not (
latest_daily_dag_run.state == DagRunState.SUCCESS
and data_interval_start_yesterday.day == latest_daily_dag_run.execution_date.day
and data_interval_start_yesterday.month == latest_daily_dag_run.execution_date.month
and data_interval_start_yesterday.year == latest_daily_dag_run.execution_date.year
):
logging.info(f'No successful DAG run was found for {dag_id} today. Skipping..')
raise AirflowSkipException
logging.info(f'Successful DAG run was found for {dag_id} today.')
Here’s the complete code for the my_hourly_dag
DAG, using a PythonOperator
to check the status of my_daily_dag
:
from datetime import datetime, timedelta
from pathlib import Path
from airflow.exceptions import AirflowSkipException
from airflow.models import DAG, DagRun
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
def check_daily_dag_success_today(**kwargs):
dag_id = 'my_daily_dag'
# Get the historical DagRuns of the daily DAG
dag_runs = DagRun.find(dag_id=dag_id)
# Sort DagRuns on descending order such that the first element
# in the list, corresponds to the latest DagRun of the daily DAG
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
# If the daily DAG was not executed ever before, simply raise an
# exception to skip.
if not dag_runs:
logging.info(f'No DAG runs found for {dag_id}. Skipping..')
raise AirflowSkipException
# Get the latest DagRun of the daily DAG
latest_daily_dag_run = dag_runs[0]
# Subtract one day from hourly DAG's current execution_date in order to
# align with the daily DAG's schedule
data_interval_start = kwargs['data_interval_start']
data_interval_start_yesterday = data_interval_start.subtract(hours=24)
# Check the intervals and the success of the daily DAg's DagRun. If conditions are not met,
# DAG run should be skipped.
if not (
latest_daily_dag_run.state == DagRunState.SUCCESS
and data_interval_start_yesterday.day == latest_daily_dag_run.execution_date.day
and data_interval_start_yesterday.month == latest_daily_dag_run.execution_date.month
and data_interval_start_yesterday.year == latest_daily_dag_run.execution_date.year
):
logging.info(f'No successful DAG run was found for {dag_id} today. Skipping..')
raise AirflowSkipException
logging.info(f'Successful DAG run was found for {dag_id} today.')
with DAG(
catchup=False,
dag_id='my_daily_dag'
start_date=datetime(2023, 7, 26),
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=2),
},
schedule_interval='0 6-20 * * *', # At :00 every hour between 6AM-8PM
max_active_runs=1,
) as dag:
check_task = PythonOperator(
task_id='check_daily_dag',
python_callable=check_daily_dag_success_today,
)
dummy_task = DummyOperator(task_id='dummy_task')
check_task >> dummy_task
Final Thoughts..
In today’s tutorial we discussed how to handle dependencies between different DAGs when using Airflow. More specifically, we discussed how to run a DAG that is supposed to execute on an hourly basis, only if a different DAG, on a daily schedule, executes successfully within the day.
Three different approaches were demonstrated. Depending on the complexity of your use-case, you should pick the one that makes more sense and results in more elegant code.
Subscribe to Data Pipeline, a newsletter dedicated to Data Engineering