Airflow Production Tips — Proper Task (Not DAG) Catchup
Last Updated on January 7, 2023 by Editorial Team
Author(s): Guilherme Banhudo
Originally published on Towards AI the World’s Leading AI and Technology News and Media Company. If you are building an AI-related product or service, we invite you to consider becoming an AI sponsor. At Towards AI, we help scale AI and technology startups. Let us help you unleash your technology to the masses.
Airflow Production Tips — Proper Task (Not DAG) Catchup
Apache Airflow has become the de facto standard for Data Orchestration. However, throughout the years and versions, it accumulated a set of nuances and bugs which can hinder its production usage.
This series of articles aims at walking Apache Airflow users through the process of overcoming these issues, the same issues I have faced.
Note: As usual, all of the code is available in my GitHub repository, here.
Proper Task (Not DAG) catchup
TLDR: Airflow’s ability to get the previous successful TaskInstance’s date does not work as intended, returning the previous successful DAG’s run date instead, which prevents you from accurately and properly picking up missing/failed information. In this post, you will find out how to bypass it. See the original bug report.
Problem Statement
One of the most interesting and helpful features of Apache Airflow is its ability to catch up with the past should a task fail. No, I am not referring to the catchup parameter you can define in your DAG, but rather giving Tasks the ability to encompass the previously failed TaskInstances (and their execution date) using either the
JINJA template, or directly accessing the previous TaskInstance whose status was a success:
This is particularly useful when you want to make sure your data stays up to date in Ingestions or your ETL consumes all information for both the current and failed past TaskInstances, for instance:
On a more practical example, considering the image below, you would expect that if the first run of Task transform_data failed whilst using the above configuration, the failed/missing data would be captured on the next run of transform_data, in the next hour, correct?
Expected Event Loop:
- Loop 1
– Run task extract_from_db, capture one hour of data, status: success
– Run task tranform_data, transform one hour of data, status: fail
– Run task load_target_db, load one hour of data, status: fail - Loop 2
– Run task extract_from_db, capture one hour of data, status: success
– Run task tranform_data, transform two hours of data (current and previously failed), status: success
– Run task load_target_db, load two hours of data (current and previously failed), status: success
Unfortunately, that is incorrect. Whilst it is the obvious and expected behavior, Airflow’s legacy code prevents such from happening. See the original bug report.
Below is the corresponding rendered template for the first and second runs of Task transform_data, respectively:
Note that the second task does not compensate for the previous failure. Instead, it just passes its own previous execution date param, 20221013T010000 instead of including the previous successful run, the full catch-up 20221013T000000.
Why does this happen?
Apache Airflow’s default behavior, when the previous execution date was successful, is to look at the previous DAG’s successful execution date, not of the TaskInstance’s, which effectively makes your DAGs incapable of automatically catching up unless the entire DAG fails.
So did we expect to happen instead? Well, we expect the rendered template in the second run to have been:
You can see the original bug report on the problem, dating back to 2021 and even longer on StackOverflow.
The Solution
Like many other Python frameworks, Apache Airflow uses an ORM (Object Relational Mapper) to abstract access to its backend database. Specifically, the usual SQL Alchemy project has been leveraged to accelerate Apache Airflow. This allows us to access the metadata database directly and manipulate it according to our requirements.
The solution to our problem is then divided into four simple steps:
- Retrieve the TaskInstance objects based on a specified State
- Retrieve the last successful TaskInstance instance for the provided Task
- Retrieve the DAGRun associated with the last successful TaskInstance instance for the provided Task
- Arm our DAG with the ability to use this information!
Lastly, a practical section example has been added to illustrate the goal!
Step 1: Retrieve the TaskInstance objects based on a specified State
The first step corresponds to having the ability to query the ORM to retrieve the database to retrieve the last TaskInstance run corresponding to a specific state:
Note: The function is a generalization used to allow you to pick up any specific state you may require
The function is self-explanatory, querying the TaskInstance ORM object and filtering it via two parameters: the desired lookup state and TaskInstance instance.
Step 2: Retrieve the last successful TaskInstance instance for the provided Task
Leveraging the previously defined generalization, we can now query the ORM to retrieve only the last run whose state was Success.
Step 3: Retrieve the DAGRun associated with the last successful TaskInstance instance for the provided Task.
Having tracked down the last successful TaskInstance instance, we can now retrieve the DAGRun object associated with said TaskInstance instance. The DAGRun model contains information regarding the ran TaskInstance instance alongside a vast array of useful information:
Armed with this knowledge, we can now retrieve the DAGRun instance associated with our retrieved last successful TaskInstance instance in a similar process as before:
Step 4: Arm our DAG with the ability to use this information!
Finally, we have to make sure Apache Airflow is aware of our new functions and can inject them into the JINJA engine.
We can do so by importing the function and passing the user-defined function to the DAG directly on its constructor, in this case, via context-manager:
You can now freely call the function directly in JINJA and pass it as an argument to your Extraction, ETL, or any other processes you may have!
Let me know in the comments if you find these resources useful and as usual, you can find this code in my GitHub repository!
Airflow Production Tips — Proper Task (Not DAG) Catchup was originally published in Towards AI on Medium, where people are continuing the conversation by highlighting and responding to this story.
Join thousands of data leaders on the AI newsletter. It’s free, we don’t spam, and we never share your email address. Keep up to date with the latest work in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.
Published via Towards AI