Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Read by thought-leaders and decision-makers around the world. Phone Number: +1-650-246-9381 Email: [email protected]
228 Park Avenue South New York, NY 10003 United States
Website: Publisher: https://towardsai.net/#publisher Diversity Policy: https://towardsai.net/about Ethics Policy: https://towardsai.net/about Masthead: https://towardsai.net/about
Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Founders: Roberto Iriondo, , Job Title: Co-founder and Advisor Works for: Towards AI, Inc. Follow Roberto: X, LinkedIn, GitHub, Google Scholar, Towards AI Profile, Medium, ML@CMU, FreeCodeCamp, Crunchbase, Bloomberg, Roberto Iriondo, Generative AI Lab, Generative AI Lab Denis Piffaretti, Job Title: Co-founder Works for: Towards AI, Inc. Louie Peters, Job Title: Co-founder Works for: Towards AI, Inc. Louis-François Bouchard, Job Title: Co-founder Works for: Towards AI, Inc. Cover:
Towards AI Cover
Logo:
Towards AI Logo
Areas Served: Worldwide Alternate Name: Towards AI, Inc. Alternate Name: Towards AI Co. Alternate Name: towards ai Alternate Name: towardsai Alternate Name: towards.ai Alternate Name: tai Alternate Name: toward ai Alternate Name: toward.ai Alternate Name: Towards AI, Inc. Alternate Name: towardsai.net Alternate Name: pub.towardsai.net
5 stars – based on 497 reviews

Frequently Used, Contextual References

TODO: Remember to copy unique IDs whenever it needs used. i.e., URL: 304b2e42315e

Resources

Unlock the full potential of AI with Building LLMs for Production—our 470+ page guide to mastering LLMs with practical projects and expert insights!

Publication

Airflow Production Tips — Grouped Failures and Retries
Latest   Machine Learning

Airflow Production Tips — Grouped Failures and Retries

Last Updated on July 25, 2023 by Editorial Team

Author(s): Guilherme Banhudo

Originally published on Towards AI.

Photo by Jackson Simmer on Unsplash

Apache Airflow has become the de facto standard for Data Orchestration. However, throughout the years and versions, it accumulated a set of nuances and bugs which can hinder its production usage.

This series of articles aims at walking Apache Airflow users through the process of overcoming these issues, the same issues I have faced.

Note: As usual, all of the code is available in my GitHub repository, here.

Grouped failures and retries

TLDR: In Airflow it is common for multiple tasks to be considered one, a common use case would be the usage of a Run Operator alongside with its Sensor Operator. If the Sensor task fails, then technically, the entire process failed and should be restarted. However, Airflow by default will only retry the Sensor task without retrying to run the Run task. In this post, you will find out how to fix it.

TLDR #2: You can skip directly to the solution, here

Problem Statement

More often than not, we are all faced with the need to create sets of Tasks we want to either Fail (and retry) or Succeed as a whole.

Consider the case of running an AzureDataFactoryRunPipelineOperator, named TaskRun in combination with the corresponding Sensor to retrieve its status, AzureDataFactoryPipelineRunStatusSensor named TaskSensor.

Whilst TaskRun can be successful in triggering the job, it is always possible for the Azure Data Factory run instance to fail, in which case, the TaskSensor will be marked as failed and up for retry. However, TaskRun will still be successful hence, not be retried.

In other words, we want related tasks to either Fail or Succeed as a whole.

Why does this happen?

In Apache Airflow, even though tasks are linked via dependency establishment (either via the <<, >> dependency operators or the set_downstream and set_upstream functions) however, this merely helps define the flow — or Direct Acyclic Graph — of your Tasks, and whilst it does define one-way dependencies (downstream Tasks), it does not convey information regarding upstream dependencies. In other words, if we combine TaskRun with TaskSensor using the appropriate dependency operators:

A Task operator followed by the corresponding Sensor operator
TaskRun = AzureDataFactoryRunPipelineOperator(...)
TaskSensor = AzureDataFactoryPipelineRunStatusSensor(...)
TaskRun >> TaskSensor

Apache Airflow knows that if TaskRun fails, TaskSensor is skipped as it checks TaskRun’s downstream tasks (TaskSensor) and skips them. However, the same cannot be inferred on the opposite side, as the upstream Task may indeed have been successful, and we are likely to not want to repeat it. For example, if instead of TaskRun and TaskSensor, we would have a TaskExtract and TaskTransform in said order, and TaskExtract was successful, but TaskTransform failed, we would not want TaskExtract to be retried, and this is Airflow’s default behavior, which is, of course, spot on.

The Solution

Unlike in our previous article, where we explored the usage of the ORM to extract DAGRun information, the solution for our problem revolves around the Tasks directly as they contain knowledge on their upstream Tasks hence the solution to our problem is much simpler.

The solution to our problem is then divided into two simple steps:

  1. Extract the list of all upstream TaskInstances of a specified TaskInstance
  2. Mark the upstream TaskInstances up for retry or as failed, contingent on the necessary behavior
  3. Add the appropriate callback functions to the Task definition

Step 1: Extract the list of all upstream TaskInstances of a specified TaskInstance

The first step is perhaps the most complex since there are two different ways of tackling the problem:

  1. We can mark tasks within a specified upstream depth
  2. Or we can mark tasks based on their Task ID

We will explore and prepare each method and finally combine both of them for extended functionality.

Starting with the first and most simple method is comprised of four steps:

  1. Get the list of Tasks to be marked based on their name
  2. Retrieve the TaskInstance instances whose name is in the provided Task names

The second method is more complex and brittle and only works under a specific assumption: all upstream Tasks to be marked are only linear dependencies meaning no branching is involved within the specified scan depth.

  1. Iterate for each specified scan depth level progressing from a lower to a higher level
  2. Retrieve and validate the number of upstream Tasks’ IDs for the Tasks at the specified scan depth level
  3. Append the retrieved upstream Task ID
  4. Proceed to the next step of the loop for the Tasks at the next scan depth level
  5. Upon loop completion, retrieve the TaskInstances associated with the retrieved Task IDs.

Combining the two methods and wrapping them in a reusable function yields our first and main step:

Step 2:Mark the upstream TaskInstances up for retry or as failed, contingent on the necessary behavior.

The second step is a straightforward implementation that leverages the previously defined _get_upstream_task_instance_list and simply marks the associated TaskInstances with the desired state.

Step 3: Add the appropriate callback functions to the Task definition

Now that we have added the possibility to extract all upstream TaskInstances associated with a given TaskInstance and mark them with the desired state, we have to let Airflow know how to leverage them.

The final step is simply to associate our functions with the on_failure_callback and on_retry_callback Task definition callbacks:

Let me know in the comments if you find these resources useful and as usual, you can find this code in my GitHub repository!

Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.

Published via Towards AI

Feedback ↓