Towards AI Can Help your Team Adopt AI: Corporate Training, Consulting, and Talent Solutions.

Publication

Airflow Production Tips — Proper Task (Not DAG) Catchup
Latest   Machine Learning

Airflow Production Tips — Proper Task (Not DAG) Catchup

Last Updated on July 25, 2023 by Editorial Team

Author(s): Guilherme Banhudo

Originally published on Towards AI.

Photo by Jackson Simmer on Unsplash

Apache Airflow has become the de facto standard for Data Orchestration. However, throughout the years and versions, it accumulated a set of nuances and bugs which can hinder its production usage.

This series of articles aims at walking Apache Airflow users through the process of overcoming these issues, the same issues I have faced.

Note: As usual, all of the code is available in my GitHub repository, here.

Proper Task (Not DAG) catchup

TLDR: Airflow’s ability to get the previous successful TaskInstance’s date does not work as intended, returning the previous successful DAG’s run date instead, which prevents you from accurately and properly picking up missing/failed information. In this post, you will find out how to bypass it. See the original bug report.

TLDR #2: Click here to skip directly to the solution

Problem Statement

One of the most interesting and helpful features of Apache Airflow is its ability to catch up with the past should a task fail. No, I am not referring to the catchup parameter you can define in your DAG, but rather giving Tasks the ability to encompass the previously failed TaskInstances (and their execution date) using either the

{{ prev_execution_date_success }} 

JINJA template, or directly accessing the previous TaskInstance whose status was a success:

This is particularly useful when you want to make sure your data stays up to date in Ingestions or your ETL consumes all information for both the current and failed past TaskInstances, for instance:

Task configuration for the extract_from_db, transform_data and load_target_db tasks

On a more practical example, considering the image below, you would expect that if the first run of Task transform_data failed whilst using the above configuration, the failed/missing data would be captured on the next run of transform_data, in the next hour, correct?

Simple ETL-like DAG consisting of three linearly dependent tasks

Expected Event Loop:

  • Loop 1
    – Run task extract_from_db, capture one hour of data, status: success
    – Run task tranform_data, transform one hour of data, status: fail
    – Run task load_target_db, load one hour of data, status: fail
  • Loop 2
    – Run task extract_from_db, capture one hour of data, status: success
    – Run task tranform_data, transform two hours of data (current and previously failed), status: success
    – Run task load_target_db, load two hours of data (current and previously failed), status: success

Unfortunately, that is incorrect. Whilst it is the obvious and expected behavior, Airflow’s legacy code prevents such from happening. See the original bug report.

Below is the corresponding rendered template for the first and second runs of Task transform_data, respectively:

The actually rendered template for the first run of the transform_data task — the failed task
The actually rendered template for the second run of the transform_data task — does not pick up on the failed task’s execution datetime

Note that the second task does not compensate for the previous failure. Instead, it just passes its own previous execution date param, 20221013T010000 instead of including the previous successful run, the full catch-up 20221013T000000.

Why does this happen?

Apache Airflow’s default behavior, when the previous execution date was successful, is to look at the previous DAG’s successful execution date, not of the TaskInstance’s, which effectively makes your DAGs incapable of automatically catching up unless the entire DAG fails.

So did we expect to happen instead? Well, we expect the rendered template in the second run to have been:

You can see the original bug report on the problem, dating back to 2021 and even longer on StackOverflow.

The Solution

Like many other Python frameworks, Apache Airflow uses an ORM (Object Relational Mapper) to abstract access to its backend database. Specifically, the usual SQL Alchemy project has been leveraged to accelerate Apache Airflow. This allows us to access the metadata database directly and manipulate it according to our requirements.

The solution to our problem is then divided into four simple steps:

  1. Retrieve the TaskInstance objects based on a specified State
  2. Retrieve the last successful TaskInstance instance for the provided Task
  3. Retrieve the DAGRun associated with the last successful TaskInstance instance for the provided Task
  4. Arm our DAG with the ability to use this information!

Lastly, a practical section example has been added to illustrate the goal!

Step 1: Retrieve the TaskInstance objects based on a specified State

The first step corresponds to having the ability to query the ORM to retrieve the database to retrieve the last TaskInstance run corresponding to a specific state:

Note: The function is a generalization used to allow you to pick up any specific state you may require

The function is self-explanatory, querying the TaskInstance ORM object and filtering it via two parameters: the desired lookup state and TaskInstance instance.

Step 2: Retrieve the last successful TaskInstance instance for the provided Task

Leveraging the previously defined generalization, we can now query the ORM to retrieve only the last run whose state was Success.

Step 3: Retrieve the DAGRun associated with the last successful TaskInstance instance for the provided Task.

Having tracked down the last successful TaskInstance instance, we can now retrieve the DAGRun object associated with said TaskInstance instance. The DAGRun model contains information regarding the ran TaskInstance instance alongside a vast array of useful information:

Armed with this knowledge, we can now retrieve the DAGRun instance associated with our retrieved last successful TaskInstance instance in a similar process as before:

Step 4: Arm our DAG with the ability to use this information!

Finally, we have to make sure Apache Airflow is aware of our new functions and can inject them into the JINJA engine.

We can do so by importing the function and passing the user-defined function to the DAG directly on its constructor, in this case, via context-manager:

You can now freely call the function directly in JINJA and pass it as an argument to your Extraction, ETL, or any other processes you may have!

Let me know in the comments if you find these resources useful and as usual, you can find this code in my GitHub repository!

Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.

Published via Towards AI

Feedback ↓