Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Read by thought-leaders and decision-makers around the world. Phone Number: +1-650-246-9381 Email: pub@towardsai.net
228 Park Avenue South New York, NY 10003 United States
Website: Publisher: https://towardsai.net/#publisher Diversity Policy: https://towardsai.net/about Ethics Policy: https://towardsai.net/about Masthead: https://towardsai.net/about
Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Founders: Roberto Iriondo, , Job Title: Co-founder and Advisor Works for: Towards AI, Inc. Follow Roberto: X, LinkedIn, GitHub, Google Scholar, Towards AI Profile, Medium, ML@CMU, FreeCodeCamp, Crunchbase, Bloomberg, Roberto Iriondo, Generative AI Lab, Generative AI Lab Denis Piffaretti, Job Title: Co-founder Works for: Towards AI, Inc. Louie Peters, Job Title: Co-founder Works for: Towards AI, Inc. Louis-François Bouchard, Job Title: Co-founder Works for: Towards AI, Inc. Cover:
Towards AI Cover
Logo:
Towards AI Logo
Areas Served: Worldwide Alternate Name: Towards AI, Inc. Alternate Name: Towards AI Co. Alternate Name: towards ai Alternate Name: towardsai Alternate Name: towards.ai Alternate Name: tai Alternate Name: toward ai Alternate Name: toward.ai Alternate Name: Towards AI, Inc. Alternate Name: towardsai.net Alternate Name: pub.towardsai.net
5 stars – based on 497 reviews

Frequently Used, Contextual References

TODO: Remember to copy unique IDs whenever it needs used. i.e., URL: 304b2e42315e

Resources

Take our 85+ lesson From Beginner to Advanced LLM Developer Certification: From choosing a project to deploying a working product this is the most comprehensive and practical LLM course out there!

Publication

Airflow Production Tips — Grouped Failures and Retries
Latest   Machine Learning

Airflow Production Tips — Grouped Failures and Retries

Last Updated on July 25, 2023 by Editorial Team

Author(s): Guilherme Banhudo

Originally published on Towards AI.

Photo by Jackson Simmer on Unsplash

Apache Airflow has become the de facto standard for Data Orchestration. However, throughout the years and versions, it accumulated a set of nuances and bugs which can hinder its production usage.

This series of articles aims at walking Apache Airflow users through the process of overcoming these issues, the same issues I have faced.

Note: As usual, all of the code is available in my GitHub repository, here.

Grouped failures and retries

TLDR: In Airflow it is common for multiple tasks to be considered one, a common use case would be the usage of a Run Operator alongside with its Sensor Operator. If the Sensor task fails, then technically, the entire process failed and should be restarted. However, Airflow by default will only retry the Sensor task without retrying to run the Run task. In this post, you will find out how to fix it.

TLDR #2: You can skip directly to the solution, here

Problem Statement

More often than not, we are all faced with the need to create sets of Tasks we want to either Fail (and retry) or Succeed as a whole.

Consider the case of running an AzureDataFactoryRunPipelineOperator, named TaskRun in combination with the corresponding Sensor to retrieve its status, AzureDataFactoryPipelineRunStatusSensor named TaskSensor.

Whilst TaskRun can be successful in triggering the job, it is always possible for the Azure Data Factory run instance to fail, in which case, the TaskSensor will be marked as failed and up for retry. However, TaskRun will still be successful hence, not be retried.

In other words, we want related tasks to either Fail or Succeed as a whole.

Why does this happen?

In Apache Airflow, even though tasks are linked via dependency establishment (either via the <<, >> dependency operators or the set_downstream and set_upstream functions) however, this merely helps define the flow — or Direct Acyclic Graph — of your Tasks, and whilst it does define one-way dependencies (downstream Tasks), it does not convey information regarding upstream dependencies. In other words, if we combine TaskRun with TaskSensor using the appropriate dependency operators:

A Task operator followed by the corresponding Sensor operator
TaskRun = AzureDataFactoryRunPipelineOperator(...)
TaskSensor = AzureDataFactoryPipelineRunStatusSensor(...)
TaskRun >> TaskSensor

Apache Airflow knows that if TaskRun fails, TaskSensor is skipped as it checks TaskRun’s downstream tasks (TaskSensor) and skips them. However, the same cannot be inferred on the opposite side, as the upstream Task may indeed have been successful, and we are likely to not want to repeat it. For example, if instead of TaskRun and TaskSensor, we would have a TaskExtract and TaskTransform in said order, and TaskExtract was successful, but TaskTransform failed, we would not want TaskExtract to be retried, and this is Airflow’s default behavior, which is, of course, spot on.

The Solution

Unlike in our previous article, where we explored the usage of the ORM to extract DAGRun information, the solution for our problem revolves around the Tasks directly as they contain knowledge on their upstream Tasks hence the solution to our problem is much simpler.

The solution to our problem is then divided into two simple steps:

  1. Extract the list of all upstream TaskInstances of a specified TaskInstance
  2. Mark the upstream TaskInstances up for retry or as failed, contingent on the necessary behavior
  3. Add the appropriate callback functions to the Task definition

Step 1: Extract the list of all upstream TaskInstances of a specified TaskInstance

The first step is perhaps the most complex since there are two different ways of tackling the problem:

  1. We can mark tasks within a specified upstream depth
  2. Or we can mark tasks based on their Task ID

We will explore and prepare each method and finally combine both of them for extended functionality.

Starting with the first and most simple method is comprised of four steps:

  1. Get the list of Tasks to be marked based on their name
  2. Retrieve the TaskInstance instances whose name is in the provided Task names

The second method is more complex and brittle and only works under a specific assumption: all upstream Tasks to be marked are only linear dependencies meaning no branching is involved within the specified scan depth.

  1. Iterate for each specified scan depth level progressing from a lower to a higher level
  2. Retrieve and validate the number of upstream Tasks’ IDs for the Tasks at the specified scan depth level
  3. Append the retrieved upstream Task ID
  4. Proceed to the next step of the loop for the Tasks at the next scan depth level
  5. Upon loop completion, retrieve the TaskInstances associated with the retrieved Task IDs.

Combining the two methods and wrapping them in a reusable function yields our first and main step:

Step 2:Mark the upstream TaskInstances up for retry or as failed, contingent on the necessary behavior.

The second step is a straightforward implementation that leverages the previously defined _get_upstream_task_instance_list and simply marks the associated TaskInstances with the desired state.

Step 3: Add the appropriate callback functions to the Task definition

Now that we have added the possibility to extract all upstream TaskInstances associated with a given TaskInstance and mark them with the desired state, we have to let Airflow know how to leverage them.

The final step is simply to associate our functions with the on_failure_callback and on_retry_callback Task definition callbacks:

Let me know in the comments if you find these resources useful and as usual, you can find this code in my GitHub repository!

Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.

Published via Towards AI

Feedback ↓

Sign Up for the Course
`; } else { console.error('Element with id="subscribe" not found within the page with class "home".'); } } }); // Remove duplicate text from articles /* Backup: 09/11/24 function removeDuplicateText() { const elements = document.querySelectorAll('h1, h2, h3, h4, h5, strong'); // Select the desired elements const seenTexts = new Set(); // A set to keep track of seen texts const tagCounters = {}; // Object to track instances of each tag elements.forEach(el => { const tagName = el.tagName.toLowerCase(); // Get the tag name (e.g., 'h1', 'h2', etc.) // Initialize a counter for each tag if not already done if (!tagCounters[tagName]) { tagCounters[tagName] = 0; } // Only process the first 10 elements of each tag type if (tagCounters[tagName] >= 2) { return; // Skip if the number of elements exceeds 10 } const text = el.textContent.trim(); // Get the text content const words = text.split(/\s+/); // Split the text into words if (words.length >= 4) { // Ensure at least 4 words const significantPart = words.slice(0, 5).join(' '); // Get first 5 words for matching // Check if the text (not the tag) has been seen before if (seenTexts.has(significantPart)) { // console.log('Duplicate found, removing:', el); // Log duplicate el.remove(); // Remove duplicate element } else { seenTexts.add(significantPart); // Add the text to the set } } tagCounters[tagName]++; // Increment the counter for this tag }); } removeDuplicateText(); */ // Remove duplicate text from articles function removeDuplicateText() { const elements = document.querySelectorAll('h1, h2, h3, h4, h5, strong'); // Select the desired elements const seenTexts = new Set(); // A set to keep track of seen texts const tagCounters = {}; // Object to track instances of each tag // List of classes to be excluded const excludedClasses = ['medium-author', 'post-widget-title']; elements.forEach(el => { // Skip elements with any of the excluded classes if (excludedClasses.some(cls => el.classList.contains(cls))) { return; // Skip this element if it has any of the excluded classes } const tagName = el.tagName.toLowerCase(); // Get the tag name (e.g., 'h1', 'h2', etc.) // Initialize a counter for each tag if not already done if (!tagCounters[tagName]) { tagCounters[tagName] = 0; } // Only process the first 10 elements of each tag type if (tagCounters[tagName] >= 10) { return; // Skip if the number of elements exceeds 10 } const text = el.textContent.trim(); // Get the text content const words = text.split(/\s+/); // Split the text into words if (words.length >= 4) { // Ensure at least 4 words const significantPart = words.slice(0, 5).join(' '); // Get first 5 words for matching // Check if the text (not the tag) has been seen before if (seenTexts.has(significantPart)) { // console.log('Duplicate found, removing:', el); // Log duplicate el.remove(); // Remove duplicate element } else { seenTexts.add(significantPart); // Add the text to the set } } tagCounters[tagName]++; // Increment the counter for this tag }); } removeDuplicateText(); //Remove unnecessary text in blog excerpts document.querySelectorAll('.blog p').forEach(function(paragraph) { // Replace the unwanted text pattern for each paragraph paragraph.innerHTML = paragraph.innerHTML .replace(/Author\(s\): [\w\s]+ Originally published on Towards AI\.?/g, '') // Removes 'Author(s): XYZ Originally published on Towards AI' .replace(/This member-only story is on us\. Upgrade to access all of Medium\./g, ''); // Removes 'This member-only story...' }); //Load ionic icons and cache them if ('localStorage' in window && window['localStorage'] !== null) { const cssLink = 'https://code.ionicframework.com/ionicons/2.0.1/css/ionicons.min.css'; const storedCss = localStorage.getItem('ionicons'); if (storedCss) { loadCSS(storedCss); } else { fetch(cssLink).then(response => response.text()).then(css => { localStorage.setItem('ionicons', css); loadCSS(css); }); } } function loadCSS(css) { const style = document.createElement('style'); style.innerHTML = css; document.head.appendChild(style); } //Remove elements from imported content automatically function removeStrongFromHeadings() { const elements = document.querySelectorAll('h1, h2, h3, h4, h5, h6, span'); elements.forEach(el => { const strongTags = el.querySelectorAll('strong'); strongTags.forEach(strongTag => { while (strongTag.firstChild) { strongTag.parentNode.insertBefore(strongTag.firstChild, strongTag); } strongTag.remove(); }); }); } removeStrongFromHeadings(); "use strict"; window.onload = () => { /* //This is an object for each category of subjects and in that there are kewords and link to the keywods let keywordsAndLinks = { //you can add more categories and define their keywords and add a link ds: { keywords: [ //you can add more keywords here they are detected and replaced with achor tag automatically 'data science', 'Data science', 'Data Science', 'data Science', 'DATA SCIENCE', ], //we will replace the linktext with the keyword later on in the code //you can easily change links for each category here //(include class="ml-link" and linktext) link: 'linktext', }, ml: { keywords: [ //Add more keywords 'machine learning', 'Machine learning', 'Machine Learning', 'machine Learning', 'MACHINE LEARNING', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, ai: { keywords: [ 'artificial intelligence', 'Artificial intelligence', 'Artificial Intelligence', 'artificial Intelligence', 'ARTIFICIAL INTELLIGENCE', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, nl: { keywords: [ 'NLP', 'nlp', 'natural language processing', 'Natural Language Processing', 'NATURAL LANGUAGE PROCESSING', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, des: { keywords: [ 'data engineering services', 'Data Engineering Services', 'DATA ENGINEERING SERVICES', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, td: { keywords: [ 'training data', 'Training Data', 'training Data', 'TRAINING DATA', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, ias: { keywords: [ 'image annotation services', 'Image annotation services', 'image Annotation services', 'image annotation Services', 'Image Annotation Services', 'IMAGE ANNOTATION SERVICES', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, l: { keywords: [ 'labeling', 'labelling', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, pbp: { keywords: [ 'previous blog posts', 'previous blog post', 'latest', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, mlc: { keywords: [ 'machine learning course', 'machine learning class', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, }; //Articles to skip let articleIdsToSkip = ['post-2651', 'post-3414', 'post-3540']; //keyword with its related achortag is recieved here along with article id function searchAndReplace(keyword, anchorTag, articleId) { //selects the h3 h4 and p tags that are inside of the article let content = document.querySelector(`#${articleId} .entry-content`); //replaces the "linktext" in achor tag with the keyword that will be searched and replaced let newLink = anchorTag.replace('linktext', keyword); //regular expression to search keyword var re = new RegExp('(' + keyword + ')', 'g'); //this replaces the keywords in h3 h4 and p tags content with achor tag content.innerHTML = content.innerHTML.replace(re, newLink); } function articleFilter(keyword, anchorTag) { //gets all the articles var articles = document.querySelectorAll('article'); //if its zero or less then there are no articles if (articles.length > 0) { for (let x = 0; x < articles.length; x++) { //articles to skip is an array in which there are ids of articles which should not get effected //if the current article's id is also in that array then do not call search and replace with its data if (!articleIdsToSkip.includes(articles[x].id)) { //search and replace is called on articles which should get effected searchAndReplace(keyword, anchorTag, articles[x].id, key); } else { console.log( `Cannot replace the keywords in article with id ${articles[x].id}` ); } } } else { console.log('No articles found.'); } } let key; //not part of script, added for (key in keywordsAndLinks) { //key is the object in keywords and links object i.e ds, ml, ai for (let i = 0; i < keywordsAndLinks[key].keywords.length; i++) { //keywordsAndLinks[key].keywords is the array of keywords for key (ds, ml, ai) //keywordsAndLinks[key].keywords[i] is the keyword and keywordsAndLinks[key].link is the link //keyword and link is sent to searchreplace where it is then replaced using regular expression and replace function articleFilter( keywordsAndLinks[key].keywords[i], keywordsAndLinks[key].link ); } } function cleanLinks() { // (making smal functions is for DRY) this function gets the links and only keeps the first 2 and from the rest removes the anchor tag and replaces it with its text function removeLinks(links) { if (links.length > 1) { for (let i = 2; i < links.length; i++) { links[i].outerHTML = links[i].textContent; } } } //arrays which will contain all the achor tags found with the class (ds-link, ml-link, ailink) in each article inserted using search and replace let dslinks; let mllinks; let ailinks; let nllinks; let deslinks; let tdlinks; let iaslinks; let llinks; let pbplinks; let mlclinks; const content = document.querySelectorAll('article'); //all articles content.forEach((c) => { //to skip the articles with specific ids if (!articleIdsToSkip.includes(c.id)) { //getting all the anchor tags in each article one by one dslinks = document.querySelectorAll(`#${c.id} .entry-content a.ds-link`); mllinks = document.querySelectorAll(`#${c.id} .entry-content a.ml-link`); ailinks = document.querySelectorAll(`#${c.id} .entry-content a.ai-link`); nllinks = document.querySelectorAll(`#${c.id} .entry-content a.ntrl-link`); deslinks = document.querySelectorAll(`#${c.id} .entry-content a.des-link`); tdlinks = document.querySelectorAll(`#${c.id} .entry-content a.td-link`); iaslinks = document.querySelectorAll(`#${c.id} .entry-content a.ias-link`); mlclinks = document.querySelectorAll(`#${c.id} .entry-content a.mlc-link`); llinks = document.querySelectorAll(`#${c.id} .entry-content a.l-link`); pbplinks = document.querySelectorAll(`#${c.id} .entry-content a.pbp-link`); //sending the anchor tags list of each article one by one to remove extra anchor tags removeLinks(dslinks); removeLinks(mllinks); removeLinks(ailinks); removeLinks(nllinks); removeLinks(deslinks); removeLinks(tdlinks); removeLinks(iaslinks); removeLinks(mlclinks); removeLinks(llinks); removeLinks(pbplinks); } }); } //To remove extra achor tags of each category (ds, ml, ai) and only have 2 of each category per article cleanLinks(); */ //Recommended Articles var ctaLinks = [ /* ' ' + '

Subscribe to our AI newsletter!

' + */ '

Take our 85+ lesson From Beginner to Advanced LLM Developer Certification: From choosing a project to deploying a working product this is the most comprehensive and practical LLM course out there!

'+ '

Towards AI has published Building LLMs for Production—our 470+ page guide to mastering LLMs with practical projects and expert insights!

' + '
' + '' + '' + '

Note: Content contains the views of the contributing authors and not Towards AI.
Disclosure: This website may contain sponsored content and affiliate links.

' + 'Discover Your Dream AI Career at Towards AI Jobs' + '

Towards AI has built a jobs board tailored specifically to Machine Learning and Data Science Jobs and Skills. Our software searches for live AI jobs each hour, labels and categorises them and makes them easily searchable. Explore over 10,000 live jobs today with Towards AI Jobs!

' + '
' + '

🔥 Recommended Articles 🔥

' + 'Why Become an LLM Developer? Launching Towards AI’s New One-Stop Conversion Course'+ 'Testing Launchpad.sh: A Container-based GPU Cloud for Inference and Fine-tuning'+ 'The Top 13 AI-Powered CRM Platforms
' + 'Top 11 AI Call Center Software for 2024
' + 'Learn Prompting 101—Prompt Engineering Course
' + 'Explore Leading Cloud Providers for GPU-Powered LLM Training
' + 'Best AI Communities for Artificial Intelligence Enthusiasts
' + 'Best Workstations for Deep Learning
' + 'Best Laptops for Deep Learning
' + 'Best Machine Learning Books
' + 'Machine Learning Algorithms
' + 'Neural Networks Tutorial
' + 'Best Public Datasets for Machine Learning
' + 'Neural Network Types
' + 'NLP Tutorial
' + 'Best Data Science Books
' + 'Monte Carlo Simulation Tutorial
' + 'Recommender System Tutorial
' + 'Linear Algebra for Deep Learning Tutorial
' + 'Google Colab Introduction
' + 'Decision Trees in Machine Learning
' + 'Principal Component Analysis (PCA) Tutorial
' + 'Linear Regression from Zero to Hero
'+ '

', /* + '

Join thousands of data leaders on the AI newsletter. It’s free, we don’t spam, and we never share your email address. Keep up to date with the latest work in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.

',*/ ]; var replaceText = { '': '', '': '', '
': '
' + ctaLinks + '
', }; Object.keys(replaceText).forEach((txtorig) => { //txtorig is the key in replacetext object const txtnew = replaceText[txtorig]; //txtnew is the value of the key in replacetext object let entryFooter = document.querySelector('article .entry-footer'); if (document.querySelectorAll('.single-post').length > 0) { //console.log('Article found.'); const text = entryFooter.innerHTML; entryFooter.innerHTML = text.replace(txtorig, txtnew); } else { // console.log('Article not found.'); //removing comment 09/04/24 } }); var css = document.createElement('style'); css.type = 'text/css'; css.innerHTML = '.post-tags { display:none !important } .article-cta a { font-size: 18px; }'; document.body.appendChild(css); //Extra //This function adds some accessibility needs to the site. function addAlly() { // In this function JQuery is replaced with vanilla javascript functions const imgCont = document.querySelector('.uw-imgcont'); imgCont.setAttribute('aria-label', 'AI news, latest developments'); imgCont.title = 'AI news, latest developments'; imgCont.rel = 'noopener'; document.querySelector('.page-mobile-menu-logo a').title = 'Towards AI Home'; document.querySelector('a.social-link').rel = 'noopener'; document.querySelector('a.uw-text').rel = 'noopener'; document.querySelector('a.uw-w-branding').rel = 'noopener'; document.querySelector('.blog h2.heading').innerHTML = 'Publication'; const popupSearch = document.querySelector$('a.btn-open-popup-search'); popupSearch.setAttribute('role', 'button'); popupSearch.title = 'Search'; const searchClose = document.querySelector('a.popup-search-close'); searchClose.setAttribute('role', 'button'); searchClose.title = 'Close search page'; // document // .querySelector('a.btn-open-popup-search') // .setAttribute( // 'href', // 'https://medium.com/towards-artificial-intelligence/search' // ); } // Add external attributes to 302 sticky and editorial links function extLink() { // Sticky 302 links, this fuction opens the link we send to Medium on a new tab and adds a "noopener" rel to them var stickyLinks = document.querySelectorAll('.grid-item.sticky a'); for (var i = 0; i < stickyLinks.length; i++) { /* stickyLinks[i].setAttribute('target', '_blank'); stickyLinks[i].setAttribute('rel', 'noopener'); */ } // Editorial 302 links, same here var editLinks = document.querySelectorAll( '.grid-item.category-editorial a' ); for (var i = 0; i < editLinks.length; i++) { editLinks[i].setAttribute('target', '_blank'); editLinks[i].setAttribute('rel', 'noopener'); } } // Add current year to copyright notices document.getElementById( 'js-current-year' ).textContent = new Date().getFullYear(); // Call functions after page load extLink(); //addAlly(); setTimeout(function() { //addAlly(); //ideally we should only need to run it once ↑ }, 5000); }; function closeCookieDialog (){ document.getElementById("cookie-consent").style.display = "none"; return false; } setTimeout ( function () { closeCookieDialog(); }, 15000); console.log(`%c 🚀🚀🚀 ███ █████ ███████ █████████ ███████████ █████████████ ███████████████ ███████ ███████ ███████ ┌───────────────────────────────────────────────────────────────────┐ │ │ │ Towards AI is looking for contributors! │ │ Join us in creating awesome AI content. │ │ Let's build the future of AI together → │ │ https://towardsai.net/contribute │ │ │ └───────────────────────────────────────────────────────────────────┘ `, `background: ; color: #00adff; font-size: large`); //Remove latest category across site document.querySelectorAll('a[rel="category tag"]').forEach(function(el) { if (el.textContent.trim() === 'Latest') { // Remove the two consecutive spaces (  ) if (el.nextSibling && el.nextSibling.nodeValue.includes('\u00A0\u00A0')) { el.nextSibling.nodeValue = ''; // Remove the spaces } el.style.display = 'none'; // Hide the element } }); // Add cross-domain measurement, anonymize IPs 'use strict'; //var ga = gtag; ga('config', 'G-9D3HKKFV1Q', 'auto', { /*'allowLinker': true,*/ 'anonymize_ip': true/*, 'linker': { 'domains': [ 'medium.com/towards-artificial-intelligence', 'datasets.towardsai.net', 'rss.towardsai.net', 'feed.towardsai.net', 'contribute.towardsai.net', 'members.towardsai.net', 'pub.towardsai.net', 'news.towardsai.net' ] } */ }); ga('send', 'pageview'); -->