Airflow Production Tips — Grouped Failures and Retries
Last Updated on July 25, 2023 by Editorial Team
Author(s): Guilherme Banhudo
Originally published on Towards AI.
Apache Airflow has become the de facto standard for Data Orchestration. However, throughout the years and versions, it accumulated a set of nuances and bugs which can hinder its production usage.
This series of articles aims at walking Apache Airflow users through the process of overcoming these issues, the same issues I have faced.
Note: As usual, all of the code is available in my GitHub repository, here.
Grouped failures and retries
TLDR: In Airflow it is common for multiple tasks to be considered one, a common use case would be the usage of a Run Operator alongside with its Sensor Operator. If the Sensor task fails, then technically, the entire process failed and should be restarted. However, Airflow by default will only retry the Sensor task without retrying to run the Run task. In this post, you will find out how to fix it.
TLDR #2: You can skip directly to the solution, here
Problem Statement
More often than not, we are all faced with the need to create sets of Tasks we want to either Fail (and retry) or Succeed as a whole.
Consider the case of running an AzureDataFactoryRunPipelineOperator, named TaskRun in combination with the corresponding Sensor to retrieve its status, AzureDataFactoryPipelineRunStatusSensor named TaskSensor.
Whilst TaskRun can be successful in triggering the job, it is always possible for the Azure Data Factory run instance to fail, in which case, the TaskSensor will be marked as failed and up for retry. However, TaskRun will still be successful hence, not be retried.
In other words, we want related tasks to either Fail or Succeed as a whole.
Why does this happen?
In Apache Airflow, even though tasks are linked via dependency establishment (either via the <<, >> dependency operators or the set_downstream and set_upstream functions) however, this merely helps define the flow — or Direct Acyclic Graph — of your Tasks, and whilst it does define one-way dependencies (downstream Tasks), it does not convey information regarding upstream dependencies. In other words, if we combine TaskRun with TaskSensor using the appropriate dependency operators:
TaskRun = AzureDataFactoryRunPipelineOperator(...)
TaskSensor = AzureDataFactoryPipelineRunStatusSensor(...)
TaskRun >> TaskSensor
Apache Airflow knows that if TaskRun fails, TaskSensor is skipped as it checks TaskRun’s downstream tasks (TaskSensor) and skips them. However, the same cannot be inferred on the opposite side, as the upstream Task may indeed have been successful, and we are likely to not want to repeat it. For example, if instead of TaskRun and TaskSensor, we would have a TaskExtract and TaskTransform in said order, and TaskExtract was successful, but TaskTransform failed, we would not want TaskExtract to be retried, and this is Airflow’s default behavior, which is, of course, spot on.
The Solution
Unlike in our previous article, where we explored the usage of the ORM to extract DAGRun information, the solution for our problem revolves around the Tasks directly as they contain knowledge on their upstream Tasks hence the solution to our problem is much simpler.
The solution to our problem is then divided into two simple steps:
- Extract the list of all upstream TaskInstances of a specified TaskInstance
- Mark the upstream TaskInstances up for retry or as failed, contingent on the necessary behavior
- Add the appropriate callback functions to the Task definition
Step 1: Extract the list of all upstream TaskInstances of a specified TaskInstance
The first step is perhaps the most complex since there are two different ways of tackling the problem:
- We can mark tasks within a specified upstream depth
- Or we can mark tasks based on their Task ID
We will explore and prepare each method and finally combine both of them for extended functionality.
Starting with the first and most simple method is comprised of four steps:
- Get the list of Tasks to be marked based on their name
- Retrieve the TaskInstance instances whose name is in the provided Task names
The second method is more complex and brittle and only works under a specific assumption: all upstream Tasks to be marked are only linear dependencies meaning no branching is involved within the specified scan depth.
- Iterate for each specified scan depth level progressing from a lower to a higher level
- Retrieve and validate the number of upstream Tasks’ IDs for the Tasks at the specified scan depth level
- Append the retrieved upstream Task ID
- Proceed to the next step of the loop for the Tasks at the next scan depth level
- Upon loop completion, retrieve the TaskInstances associated with the retrieved Task IDs.
Combining the two methods and wrapping them in a reusable function yields our first and main step:
Step 2:Mark the upstream TaskInstances up for retry or as failed, contingent on the necessary behavior.
The second step is a straightforward implementation that leverages the previously defined _get_upstream_task_instance_list and simply marks the associated TaskInstances with the desired state.
Step 3: Add the appropriate callback functions to the Task definition
Now that we have added the possibility to extract all upstream TaskInstances associated with a given TaskInstance and mark them with the desired state, we have to let Airflow know how to leverage them.
The final step is simply to associate our functions with the on_failure_callback and on_retry_callback Task definition callbacks:
Let me know in the comments if you find these resources useful and as usual, you can find this code in my GitHub repository!
Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.
Published via Towards AI