The world’s leading publication for data science, AI, and ML professionals.

Running Airflow DAG Only If Another DAG Is Successful

Using Airflow sensors to control the execution of DAGs on a different schedule

Image generated by DALL-E2
Image generated by DALL-E2

Recently, I’ve been trying to coordinate two Airflow DAGs such that one would only run – on its own hourly schedule – if the other DAG (running on a daily basis) has been successful.

In today’s tutorial I will walk you through the use case and demonstrate how to achieve the desired behaviour in three different ways; two using the ExternalTaskSensor and another one using a customised approach with PythonOperator.


Use Case: Running the hourly DAG only if the daily DAG succeeded

Now let’s get started with our use case that involves two Airflow DAGs.

The first DAG, my_daily_dag, runs every day at 5AM UTC.

from datetime import datetime, timedelta
from pathlib import Path

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator

with DAG(
    catchup=False,
    dag_id='my_daily_dag'
    start_date=datetime(2023, 7, 26),
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
    schedule_interval='0 5 * * *',
    max_active_runs=1,
) as dag:
   DummyOperator(task_id='dummy_task')

The second DAG, my_hourly_dag, runs on an hourly basis, between 6AM and 8PM UTC.

from datetime import datetime, timedelta
from pathlib import Path

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator

with DAG(
    catchup=False,
    dag_id='my_daily_dag'
    start_date=datetime(2023, 7, 26),
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
    schedule_interval='0 6-20 * * *',  # At :00 every hour between 6AM-8PM
    max_active_runs=1,
) as dag:
   DummyOperator(task_id='dummy_task')

In our use case, we would like my_hourly_dag to run only if my_daily_dag has ran successfully within the current date. If not, then my_hourly_dag should be skipped. It is important to mention here that we don’t want to trigger my_hourly_dag as soon as my_daily_dag succeeds. That would be achievable with TriggerDagRun operator. Instead, we want both DAGs to run on their own schedule but add a condition on the my_hourly_dag.

How to Skip Tasks in Airflow DAGs

In the next two sections we will discuss and demonstrate how to achieve this using a few different approaches.


Determining the execution_date of both DAGs

Before jumping into implementation details, it is important to first understand how the two DAGs differ in terms of their respective execution_date. This is crucial since we will use this knowledge to determine the implementation of the desired behaviour.

Let’s assume that today is December 13th. The daily DAG my_daily_dag, has an execution_date of 2023–12–12 00:00 since it covers the data interval between 2023–12–12 and 2023–12–13. Recall that Airflow DAG runs start at the end of an interval.

Meanwhile, our hourly my_hourly_dag DAG has an execution_date of 2023–12–13 (except the midnight run that will have an execution_date of 2023–12–12 since the beginning of the interval is 2023–12–12 23:00 through 2023–12–13 00:00).


Using ExternalTaskSensor

Our first option is the built-in [ExternalTaskSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor) operator.

Waits for a different DAG, task group, or task to complete for a specific logical date.

By default, the ExternalTaskSensor will wait for the external task to succeed, at which point it will also succeed. However, by default it will not fail if the external task fails, but will continue to check the status until the sensor times out (thus giving you time to retry the external task without also having to clear the sensor).

We can use this sensor in our my_hourly_dag that will essentially check if my_daily_dag has been successful in the specified interval.

The ExternalTaskSensor accepts one of execution_delta or execution_date_fn. The former can be used to indicate the time difference with the previous execution to look at. By default, this is set to the logical date as the current task/DAG. The latter, receives a callable (i.e. a function) that accepts the current execution’s logical date as the first position argument and returns the desired logical date(s) to query.

– execution_delta ([datetime.timedelta](https://docs.python.org/3/library/datetime.html#datetime.timedelta) | None) – time difference with the previous execution to look at, the default is the same logical date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.

– execution_date_fn (Callable | None) – function that receives the current execution’s logical date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired logical dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.

Since the two DAGs are running on a different schedule, the default behaviour of the sensor won’t work for us . In the previous section, we clarified why the two DAGs will have different execution dates.

Therefore, we need to figure out how to use either the execution_delta or execution_date_fn to make both execution dates align with each other.

Using ExternalTaskSensor with execution_delta

The simplest approach in my opinion is to use execution_delta. The data interval start date of our daily DAG, is "yesterday at 5AM UTC". Since we know that my_hourly_dag runs on an hourly basis, we can come up with a formula to compute the delta between interval start datetime of hourly DAG and the interval start datetime of the daily DAG.

The following will create a delta that adds up:

  • 24 that corresponds to the difference of 24 hours the two DAGs have, given that they run on a different schedule, as explained earlier
  • the difference between the hour of the interval start datetime of the hourly dag and 5, which is the hour the daily DAG runs every day.
24 + (hourly_dag_interval_start_hour - 5)

As an example, consider the following scenarios when the hourly DAG starts running at 6AM (until 8PM):

At 6AM:

  • hourly data interval starts at 5AM (and ends at 6AM)
  • daily data interval starts at 5AM yesterday
  • execution_delta=24 + (5-5) = 24
  • The sensor will check the success of the daily DAG with data interval start date set to 24 hours before

At 7AM:

  • hourly data intevral starts at 6AM (and ends at 7AM)
  • daily data interval starts at 5AM yesterday
  • execution_delta=24 + (6-5) = 25
  • The sensor will check the success of the daily DAG with data interval start date set to 25 hours before

and so on.

Now how do we implement this? One problem we need to face is that (by the time this post was written), execution_delta is not a templated field meaning that we cannot use the templated variables that give us access to useful information, including the data_interval_start.

Therefore, we will have to manually construct the data_interval_start of the hourly DAG. Given that DAG runs every hour, the data interval start hour corresponds the current hour minus one

from datetime import datetime, timezone

datetime.now(timezone.utc).hour - 1

Therefore, the execution_delta that will be provided as an argument to the ExternalTaskSensor can now be defined as:

execution_delta=timedelta(hours=24 + datetime.now(timezone.utc).hour - 1 - 5)

Here’s the full code of our hourly DAG, that will run every hour between 6AM and 8PM UTC, only if the daily DAG has been successful today.

from datetime import datetime, timedelta, timezone
from pathlib import Path

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor

with DAG(
    catchup=False,
    dag_id='my_daily_dag'
    start_date=datetime(2023, 7, 26),
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
    schedule_interval='0 6-20 * * *',  # At :00 every hour between 6AM-8PM
    max_active_runs=1,
) as dag:
    sensor_task = ExternalTaskSensor(
        task_id='daily_dag_completed_successfully',
        external_dag_id='my_daily_dag',
        soft_fail=True,
        check_existence=True,
        execution_delta=timedelta(hours=24 + datetime.now(timezone.utc).hour - 1 - 5),
        poke_interval=30,
        timeout=120,
    )

    dummy_task = DummyOperator(task_id='dummy_task')

    sensor_task >> dummy_task

Using ExternalTaskSensor with execution_date_fn

Now apart from execution_delta, the sensor can be configured to work with execution_date_fn that accepts a callable returning the logical date(s) to be queried.

In other words, we need to create a function and fetch the desired logical date of the daily DAG that needs to be against the conditions of the sensor, that by default will check whether the state of the DagRun at the specified interval was successful.

The function below, will fetch the DagRuns of the daily DAG and return the execution date of the DagRun only if it happened on the same day as the hourly DAG. If no DagRun is found (which means that the daily DAG was not executed in the past, AirflowSkipException will be raised such that the sensor task (and any downstream) is skipped. Likewise, if no DagRun for the daily DAG is found that happened on the same date as the hourly DAG, the current_logical_dt will be returned, which is essentially the default value that is checked by ExternalTaskSensor (and is the argument that must be present in the function definition that is provided when using execution_date_fn argument).

Recall that the two DAGs run on a different schedule which means their execution_date differs. In order to make a proper comparison and determine whether the daily DAG was executed successfully on the same day that the hourly DAG runs, we need to subtract one day from the hourly DAG’s execution date. Note that we are only interested whether the year, month and day between the two DAGs is the same (we don’t really care about the time information in this context).

import logging 

from airflow.exceptions import AirflowSkipException
from airflow.models import DagRun

def get_most_recent_dag_run(current_logical_dt):
    dag_id = 'my_daily_dag'
    # Get the historical DagRuns of the daily DAG
    dag_runs = DagRun.find(dag_id=dag_id)

    # Sort DagRuns on descending order such that the first element
    # in the list, corresponds to the latest DagRun of the daily DAG
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)

    # If the daily DAG was not executed ever before, simply raise an 
    # exception to skip. 
    if not dag_runs:
        logging.info(f'No DAG runs found for {dag_id}. Skipping..')
        raise AirflowSkipException

    # Get the latest DagRun of the daily DAG
    latest_daily_dag_run = dag_runs[0]

    # Subtract one day from hourly's DAG current execution_date in order to 
    # align with the daily DAG's scedule
    current_logical_dt_yesterday = current_logical_dt.subtract(hours=24)

    # if year/month/day of daily's DAG execution_date and hourly's DAG execution_date
    # (minus one day) are the same, it means the daily DAG was executed today. 
    # We therefore return the execution_date of the latest daily DagRun. 
    # It's state (i.e. if successful) will be handled by the sensor and the configuration 
    # we provide to it. 
    if (
        current_logical_dt_yesterday.day == latest_daily_dag_run.execution_date.day
        and current_logical_dt_yesterday.month == latest_daily_dag_run.execution_date.month
        and current_logical_dt_yesterday.year == latest_daily_dag_run.execution_date.year
    ):
        logging.info(f'DAG run was found for {dag_id} today.')
        return latest_daily_dag_run.execution_date

    # Alternatively, return the current execution_date of the hourly DAG
    # This is the default value the sensor would otherwise use, and essentially
    # it means that the sensor won't be triggered given that the intervals between 
    # the daily DAG and the sensor won't align. 
    return current_logical_dt

Here’s the full code for our hourly DAG using execution_function_fn with ExternalTaskSensor.

import logging
from datetime import datetime, timedelta
from pathlib import Path

from airflow.exceptions import AirflowSkipException
from airflow.models import DAG, DagRun
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor

def get_most_recent_dag_run(current_logical_dt):
    dag_id = 'my_daily_dag'
    # Get the historical DagRuns of the daily DAG
    dag_runs = DagRun.find(dag_id=dag_id)

    # Sort DagRuns on descending order such that the first element
    # in the list, corresponds to the latest DagRun of the daily DAG
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)

    # If the daily DAG was not executed ever before, simply raise an 
    # exception to skip. 
    if not dag_runs:
        logging.info(f'No DAG runs found for {dag_id}. Skipping..')
        raise AirflowSkipException

    # Get the latest DagRun of the daily DAG
    latest_daily_dag_run = dag_runs[0]

    # Subtract one day from hourly DAG's current execution_date in order to 
    # align with the daily DAG's scedule
    current_logical_dt_yesterday = current_logical_dt.subtract(hours=24)

    # if year/month/day of daily DAG's execution_date and hourly DAG's execution_date
    # (minus one day) are the same, it means the daily DAG was executed today. 
    # We therefore return the execution_date of the latest daily DagRun. 
    # It's state (i.e. if successful) will be handled by the sensor and the configuration 
    # we provide to it. 
    if (
        current_logical_dt_yesterday.day == latest_daily_dag_run.execution_date.day
        and current_logical_dt_yesterday.month == latest_daily_dag_run.execution_date.month
        and current_logical_dt_yesterday.year == latest_daily_dag_run.execution_date.year
    ):
        logging.info(f'DAG run was found for {dag_id} today.')
        return latest_daily_dag_run.execution_date

    # Alternatively, return the current execution_date of the hourly DAG
    # This is the default value the sensor would otherwise use, and essentially
    # it means that the sensor won't be triggered given that the intervals between 
    # the daily DAG and the sensor won't align. 
    return current_logical_dt

with DAG(
    catchup=False,
    dag_id='my_daily_dag'
    start_date=datetime(2023, 7, 26),
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
    schedule_interval='0 6-20 * * *',  # At :00 every hour between 6AM-8PM
    max_active_runs=1,
) as dag:
    sensor_task = ExternalTaskSensor(
        task_id='daily_dag_completed_successfully',
        external_dag_id='my_daily_dag',
        soft_fail=True,
        check_existence=True,
        execution_function_fn=get_most_recent_dag_run,
        poke_interval=30,
        timeout=120,
    )

    dummy_task = DummyOperator(task_id='dummy_task')

    sensor_task >> dummy_task

Using PythonOperator

The second approach involves a more customised solution. More specifically, we can programmatically find the latest successful DagRun of our daily DAG and handle the behaviour of the operator accordingly. In other words, if the latest successful DagRun of the daily DAG does not align with the execution date of our hourly DAG, the task will be skipped (as well as the downstream tasks).

Therefore, we can write a function – similar to the one we have written in the previous section and was used as an argument to the execution_date_fn argument for ExternalTaskSensor.

More specifically, we need to fetch the DagRuns of the daily DAG, determine if anyone has completed successfully today (i.e. on the same day the hourly DAG runs). If none is found, we raise a AirflowSkipException such that the execution of the hourly DAG is skipped. In this case, the PythonOperator supports templated variables and we will therefore take advantage of it.

This is what our function looks like:

from airflow.exceptions import AirflowSkipException
from airflow.models import DagRun
from airflow.utils.state import DagRunState

def check_daily_dag_success_today(**kwargs):
    dag_id = 'my_daily_dag'
    # Get the historical DagRuns of the daily DAG
    dag_runs = DagRun.find(dag_id=dag_id)

    # Sort DagRuns on descending order such that the first element
    # in the list, corresponds to the latest DagRun of the daily DAG
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)

    # If the daily DAG was not executed ever before, simply raise an
    # exception to skip.
    if not dag_runs:
        logging.info(f'No DAG runs found for {dag_id}. Skipping..')
        raise AirflowSkipException

    # Get the latest DagRun of the daily DAG
    latest_daily_dag_run = dag_runs[0]

    # Subtract one day from hourly DAG's current execution_date in order to
    # align with the daily DAG's schedule
    data_interval_start = kwargs['data_interval_start']
    data_interval_start_yesterday = data_interval_start.subtract(hours=24)

    # Check the intervals and the success of the daily DAg's DagRun. If conditions are not met,
    # DAG run should be skipped.
    if not (
        latest_daily_dag_run.state == DagRunState.SUCCESS
        and data_interval_start_yesterday.day == latest_daily_dag_run.execution_date.day
        and data_interval_start_yesterday.month == latest_daily_dag_run.execution_date.month
        and data_interval_start_yesterday.year == latest_daily_dag_run.execution_date.year
    ):
        logging.info(f'No successful DAG run was found for {dag_id} today. Skipping..')
        raise AirflowSkipException

    logging.info(f'Successful DAG run was found for {dag_id} today.')

Here’s the complete code for the my_hourly_dag DAG, using a PythonOperator to check the status of my_daily_dag:

from datetime import datetime, timedelta
from pathlib import Path

from airflow.exceptions import AirflowSkipException
from airflow.models import DAG, DagRun
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator

def check_daily_dag_success_today(**kwargs):
    dag_id = 'my_daily_dag'
    # Get the historical DagRuns of the daily DAG
    dag_runs = DagRun.find(dag_id=dag_id)

    # Sort DagRuns on descending order such that the first element
    # in the list, corresponds to the latest DagRun of the daily DAG
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)

    # If the daily DAG was not executed ever before, simply raise an
    # exception to skip.
    if not dag_runs:
        logging.info(f'No DAG runs found for {dag_id}. Skipping..')
        raise AirflowSkipException

    # Get the latest DagRun of the daily DAG
    latest_daily_dag_run = dag_runs[0]

    # Subtract one day from hourly DAG's current execution_date in order to
    # align with the daily DAG's schedule
    data_interval_start = kwargs['data_interval_start']
    data_interval_start_yesterday = data_interval_start.subtract(hours=24)

    # Check the intervals and the success of the daily DAg's DagRun. If conditions are not met,
    # DAG run should be skipped.
    if not (
        latest_daily_dag_run.state == DagRunState.SUCCESS
        and data_interval_start_yesterday.day == latest_daily_dag_run.execution_date.day
        and data_interval_start_yesterday.month == latest_daily_dag_run.execution_date.month
        and data_interval_start_yesterday.year == latest_daily_dag_run.execution_date.year
    ):
        logging.info(f'No successful DAG run was found for {dag_id} today. Skipping..')
        raise AirflowSkipException

    logging.info(f'Successful DAG run was found for {dag_id} today.')

with DAG(
    catchup=False,
    dag_id='my_daily_dag'
    start_date=datetime(2023, 7, 26),
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
    schedule_interval='0 6-20 * * *',  # At :00 every hour between 6AM-8PM
    max_active_runs=1,
) as dag:
   check_task = PythonOperator(
       task_id='check_daily_dag', 
       python_callable=check_daily_dag_success_today,
   )
   dummy_task = DummyOperator(task_id='dummy_task')

   check_task >> dummy_task

Final Thoughts..

In today’s tutorial we discussed how to handle dependencies between different DAGs when using Airflow. More specifically, we discussed how to run a DAG that is supposed to execute on an hourly basis, only if a different DAG, on a daily schedule, executes successfully within the day.

Three different approaches were demonstrated. Depending on the complexity of your use-case, you should pick the one that makes more sense and results in more elegant code.


Subscribe to Data Pipeline, a newsletter dedicated to Data Engineering


Related Articles