Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Read by thought-leaders and decision-makers around the world. Phone Number: +1-650-246-9381 Email: pub@towardsai.net
228 Park Avenue South New York, NY 10003 United States
Website: Publisher: https://towardsai.net/#publisher Diversity Policy: https://towardsai.net/about Ethics Policy: https://towardsai.net/about Masthead: https://towardsai.net/about
Name: Towards AI Legal Name: Towards AI, Inc. Description: Towards AI is the world's leading artificial intelligence (AI) and technology publication. Founders: Roberto Iriondo, , Job Title: Co-founder and Advisor Works for: Towards AI, Inc. Follow Roberto: X, LinkedIn, GitHub, Google Scholar, Towards AI Profile, Medium, ML@CMU, FreeCodeCamp, Crunchbase, Bloomberg, Roberto Iriondo, Generative AI Lab, Generative AI Lab Denis Piffaretti, Job Title: Co-founder Works for: Towards AI, Inc. Louie Peters, Job Title: Co-founder Works for: Towards AI, Inc. Louis-François Bouchard, Job Title: Co-founder Works for: Towards AI, Inc. Cover:
Towards AI Cover
Logo:
Towards AI Logo
Areas Served: Worldwide Alternate Name: Towards AI, Inc. Alternate Name: Towards AI Co. Alternate Name: towards ai Alternate Name: towardsai Alternate Name: towards.ai Alternate Name: tai Alternate Name: toward ai Alternate Name: toward.ai Alternate Name: Towards AI, Inc. Alternate Name: towardsai.net Alternate Name: pub.towardsai.net
5 stars – based on 497 reviews

Frequently Used, Contextual References

TODO: Remember to copy unique IDs whenever it needs used. i.e., URL: 304b2e42315e

Resources

Take our 85+ lesson From Beginner to Advanced LLM Developer Certification: From choosing a project to deploying a working product this is the most comprehensive and practical LLM course out there!

Publication

Airflow Production Tips — Proper Task (Not DAG) Catchup
Latest   Machine Learning

Airflow Production Tips — Proper Task (Not DAG) Catchup

Last Updated on July 25, 2023 by Editorial Team

Author(s): Guilherme Banhudo

Originally published on Towards AI.

Photo by Jackson Simmer on Unsplash

Apache Airflow has become the de facto standard for Data Orchestration. However, throughout the years and versions, it accumulated a set of nuances and bugs which can hinder its production usage.

This series of articles aims at walking Apache Airflow users through the process of overcoming these issues, the same issues I have faced.

Note: As usual, all of the code is available in my GitHub repository, here.

Proper Task (Not DAG) catchup

TLDR: Airflow’s ability to get the previous successful TaskInstance’s date does not work as intended, returning the previous successful DAG’s run date instead, which prevents you from accurately and properly picking up missing/failed information. In this post, you will find out how to bypass it. See the original bug report.

TLDR #2: Click here to skip directly to the solution

Problem Statement

One of the most interesting and helpful features of Apache Airflow is its ability to catch up with the past should a task fail. No, I am not referring to the catchup parameter you can define in your DAG, but rather giving Tasks the ability to encompass the previously failed TaskInstances (and their execution date) using either the

{{ prev_execution_date_success }} 

JINJA template, or directly accessing the previous TaskInstance whose status was a success:

This is particularly useful when you want to make sure your data stays up to date in Ingestions or your ETL consumes all information for both the current and failed past TaskInstances, for instance:

Task configuration for the extract_from_db, transform_data and load_target_db tasks

On a more practical example, considering the image below, you would expect that if the first run of Task transform_data failed whilst using the above configuration, the failed/missing data would be captured on the next run of transform_data, in the next hour, correct?

Simple ETL-like DAG consisting of three linearly dependent tasks

Expected Event Loop:

  • Loop 1
    – Run task extract_from_db, capture one hour of data, status: success
    – Run task tranform_data, transform one hour of data, status: fail
    – Run task load_target_db, load one hour of data, status: fail
  • Loop 2
    – Run task extract_from_db, capture one hour of data, status: success
    – Run task tranform_data, transform two hours of data (current and previously failed), status: success
    – Run task load_target_db, load two hours of data (current and previously failed), status: success

Unfortunately, that is incorrect. Whilst it is the obvious and expected behavior, Airflow’s legacy code prevents such from happening. See the original bug report.

Below is the corresponding rendered template for the first and second runs of Task transform_data, respectively:

The actually rendered template for the first run of the transform_data task — the failed task
The actually rendered template for the second run of the transform_data task — does not pick up on the failed task’s execution datetime

Note that the second task does not compensate for the previous failure. Instead, it just passes its own previous execution date param, 20221013T010000 instead of including the previous successful run, the full catch-up 20221013T000000.

Why does this happen?

Apache Airflow’s default behavior, when the previous execution date was successful, is to look at the previous DAG’s successful execution date, not of the TaskInstance’s, which effectively makes your DAGs incapable of automatically catching up unless the entire DAG fails.

So did we expect to happen instead? Well, we expect the rendered template in the second run to have been:

You can see the original bug report on the problem, dating back to 2021 and even longer on StackOverflow.

The Solution

Like many other Python frameworks, Apache Airflow uses an ORM (Object Relational Mapper) to abstract access to its backend database. Specifically, the usual SQL Alchemy project has been leveraged to accelerate Apache Airflow. This allows us to access the metadata database directly and manipulate it according to our requirements.

The solution to our problem is then divided into four simple steps:

  1. Retrieve the TaskInstance objects based on a specified State
  2. Retrieve the last successful TaskInstance instance for the provided Task
  3. Retrieve the DAGRun associated with the last successful TaskInstance instance for the provided Task
  4. Arm our DAG with the ability to use this information!

Lastly, a practical section example has been added to illustrate the goal!

Step 1: Retrieve the TaskInstance objects based on a specified State

The first step corresponds to having the ability to query the ORM to retrieve the database to retrieve the last TaskInstance run corresponding to a specific state:

Note: The function is a generalization used to allow you to pick up any specific state you may require

The function is self-explanatory, querying the TaskInstance ORM object and filtering it via two parameters: the desired lookup state and TaskInstance instance.

Step 2: Retrieve the last successful TaskInstance instance for the provided Task

Leveraging the previously defined generalization, we can now query the ORM to retrieve only the last run whose state was Success.

Step 3: Retrieve the DAGRun associated with the last successful TaskInstance instance for the provided Task.

Having tracked down the last successful TaskInstance instance, we can now retrieve the DAGRun object associated with said TaskInstance instance. The DAGRun model contains information regarding the ran TaskInstance instance alongside a vast array of useful information:

Armed with this knowledge, we can now retrieve the DAGRun instance associated with our retrieved last successful TaskInstance instance in a similar process as before:

Step 4: Arm our DAG with the ability to use this information!

Finally, we have to make sure Apache Airflow is aware of our new functions and can inject them into the JINJA engine.

We can do so by importing the function and passing the user-defined function to the DAG directly on its constructor, in this case, via context-manager:

You can now freely call the function directly in JINJA and pass it as an argument to your Extraction, ETL, or any other processes you may have!

Let me know in the comments if you find these resources useful and as usual, you can find this code in my GitHub repository!

Join thousands of data leaders on the AI newsletter. Join over 80,000 subscribers and keep up to date with the latest developments in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.

Published via Towards AI

Feedback ↓

Sign Up for the Course
`; } else { console.error('Element with id="subscribe" not found within the page with class "home".'); } } }); // Remove duplicate text from articles /* Backup: 09/11/24 function removeDuplicateText() { const elements = document.querySelectorAll('h1, h2, h3, h4, h5, strong'); // Select the desired elements const seenTexts = new Set(); // A set to keep track of seen texts const tagCounters = {}; // Object to track instances of each tag elements.forEach(el => { const tagName = el.tagName.toLowerCase(); // Get the tag name (e.g., 'h1', 'h2', etc.) // Initialize a counter for each tag if not already done if (!tagCounters[tagName]) { tagCounters[tagName] = 0; } // Only process the first 10 elements of each tag type if (tagCounters[tagName] >= 2) { return; // Skip if the number of elements exceeds 10 } const text = el.textContent.trim(); // Get the text content const words = text.split(/\s+/); // Split the text into words if (words.length >= 4) { // Ensure at least 4 words const significantPart = words.slice(0, 5).join(' '); // Get first 5 words for matching // Check if the text (not the tag) has been seen before if (seenTexts.has(significantPart)) { // console.log('Duplicate found, removing:', el); // Log duplicate el.remove(); // Remove duplicate element } else { seenTexts.add(significantPart); // Add the text to the set } } tagCounters[tagName]++; // Increment the counter for this tag }); } removeDuplicateText(); */ // Remove duplicate text from articles function removeDuplicateText() { const elements = document.querySelectorAll('h1, h2, h3, h4, h5, strong'); // Select the desired elements const seenTexts = new Set(); // A set to keep track of seen texts const tagCounters = {}; // Object to track instances of each tag // List of classes to be excluded const excludedClasses = ['medium-author', 'post-widget-title']; elements.forEach(el => { // Skip elements with any of the excluded classes if (excludedClasses.some(cls => el.classList.contains(cls))) { return; // Skip this element if it has any of the excluded classes } const tagName = el.tagName.toLowerCase(); // Get the tag name (e.g., 'h1', 'h2', etc.) // Initialize a counter for each tag if not already done if (!tagCounters[tagName]) { tagCounters[tagName] = 0; } // Only process the first 10 elements of each tag type if (tagCounters[tagName] >= 10) { return; // Skip if the number of elements exceeds 10 } const text = el.textContent.trim(); // Get the text content const words = text.split(/\s+/); // Split the text into words if (words.length >= 4) { // Ensure at least 4 words const significantPart = words.slice(0, 5).join(' '); // Get first 5 words for matching // Check if the text (not the tag) has been seen before if (seenTexts.has(significantPart)) { // console.log('Duplicate found, removing:', el); // Log duplicate el.remove(); // Remove duplicate element } else { seenTexts.add(significantPart); // Add the text to the set } } tagCounters[tagName]++; // Increment the counter for this tag }); } removeDuplicateText(); //Remove unnecessary text in blog excerpts document.querySelectorAll('.blog p').forEach(function(paragraph) { // Replace the unwanted text pattern for each paragraph paragraph.innerHTML = paragraph.innerHTML .replace(/Author\(s\): [\w\s]+ Originally published on Towards AI\.?/g, '') // Removes 'Author(s): XYZ Originally published on Towards AI' .replace(/This member-only story is on us\. Upgrade to access all of Medium\./g, ''); // Removes 'This member-only story...' }); //Load ionic icons and cache them if ('localStorage' in window && window['localStorage'] !== null) { const cssLink = 'https://code.ionicframework.com/ionicons/2.0.1/css/ionicons.min.css'; const storedCss = localStorage.getItem('ionicons'); if (storedCss) { loadCSS(storedCss); } else { fetch(cssLink).then(response => response.text()).then(css => { localStorage.setItem('ionicons', css); loadCSS(css); }); } } function loadCSS(css) { const style = document.createElement('style'); style.innerHTML = css; document.head.appendChild(style); } //Remove elements from imported content automatically function removeStrongFromHeadings() { const elements = document.querySelectorAll('h1, h2, h3, h4, h5, h6, span'); elements.forEach(el => { const strongTags = el.querySelectorAll('strong'); strongTags.forEach(strongTag => { while (strongTag.firstChild) { strongTag.parentNode.insertBefore(strongTag.firstChild, strongTag); } strongTag.remove(); }); }); } removeStrongFromHeadings(); "use strict"; window.onload = () => { /* //This is an object for each category of subjects and in that there are kewords and link to the keywods let keywordsAndLinks = { //you can add more categories and define their keywords and add a link ds: { keywords: [ //you can add more keywords here they are detected and replaced with achor tag automatically 'data science', 'Data science', 'Data Science', 'data Science', 'DATA SCIENCE', ], //we will replace the linktext with the keyword later on in the code //you can easily change links for each category here //(include class="ml-link" and linktext) link: 'linktext', }, ml: { keywords: [ //Add more keywords 'machine learning', 'Machine learning', 'Machine Learning', 'machine Learning', 'MACHINE LEARNING', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, ai: { keywords: [ 'artificial intelligence', 'Artificial intelligence', 'Artificial Intelligence', 'artificial Intelligence', 'ARTIFICIAL INTELLIGENCE', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, nl: { keywords: [ 'NLP', 'nlp', 'natural language processing', 'Natural Language Processing', 'NATURAL LANGUAGE PROCESSING', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, des: { keywords: [ 'data engineering services', 'Data Engineering Services', 'DATA ENGINEERING SERVICES', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, td: { keywords: [ 'training data', 'Training Data', 'training Data', 'TRAINING DATA', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, ias: { keywords: [ 'image annotation services', 'Image annotation services', 'image Annotation services', 'image annotation Services', 'Image Annotation Services', 'IMAGE ANNOTATION SERVICES', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, l: { keywords: [ 'labeling', 'labelling', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, pbp: { keywords: [ 'previous blog posts', 'previous blog post', 'latest', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, mlc: { keywords: [ 'machine learning course', 'machine learning class', ], //Change your article link (include class="ml-link" and linktext) link: 'linktext', }, }; //Articles to skip let articleIdsToSkip = ['post-2651', 'post-3414', 'post-3540']; //keyword with its related achortag is recieved here along with article id function searchAndReplace(keyword, anchorTag, articleId) { //selects the h3 h4 and p tags that are inside of the article let content = document.querySelector(`#${articleId} .entry-content`); //replaces the "linktext" in achor tag with the keyword that will be searched and replaced let newLink = anchorTag.replace('linktext', keyword); //regular expression to search keyword var re = new RegExp('(' + keyword + ')', 'g'); //this replaces the keywords in h3 h4 and p tags content with achor tag content.innerHTML = content.innerHTML.replace(re, newLink); } function articleFilter(keyword, anchorTag) { //gets all the articles var articles = document.querySelectorAll('article'); //if its zero or less then there are no articles if (articles.length > 0) { for (let x = 0; x < articles.length; x++) { //articles to skip is an array in which there are ids of articles which should not get effected //if the current article's id is also in that array then do not call search and replace with its data if (!articleIdsToSkip.includes(articles[x].id)) { //search and replace is called on articles which should get effected searchAndReplace(keyword, anchorTag, articles[x].id, key); } else { console.log( `Cannot replace the keywords in article with id ${articles[x].id}` ); } } } else { console.log('No articles found.'); } } let key; //not part of script, added for (key in keywordsAndLinks) { //key is the object in keywords and links object i.e ds, ml, ai for (let i = 0; i < keywordsAndLinks[key].keywords.length; i++) { //keywordsAndLinks[key].keywords is the array of keywords for key (ds, ml, ai) //keywordsAndLinks[key].keywords[i] is the keyword and keywordsAndLinks[key].link is the link //keyword and link is sent to searchreplace where it is then replaced using regular expression and replace function articleFilter( keywordsAndLinks[key].keywords[i], keywordsAndLinks[key].link ); } } function cleanLinks() { // (making smal functions is for DRY) this function gets the links and only keeps the first 2 and from the rest removes the anchor tag and replaces it with its text function removeLinks(links) { if (links.length > 1) { for (let i = 2; i < links.length; i++) { links[i].outerHTML = links[i].textContent; } } } //arrays which will contain all the achor tags found with the class (ds-link, ml-link, ailink) in each article inserted using search and replace let dslinks; let mllinks; let ailinks; let nllinks; let deslinks; let tdlinks; let iaslinks; let llinks; let pbplinks; let mlclinks; const content = document.querySelectorAll('article'); //all articles content.forEach((c) => { //to skip the articles with specific ids if (!articleIdsToSkip.includes(c.id)) { //getting all the anchor tags in each article one by one dslinks = document.querySelectorAll(`#${c.id} .entry-content a.ds-link`); mllinks = document.querySelectorAll(`#${c.id} .entry-content a.ml-link`); ailinks = document.querySelectorAll(`#${c.id} .entry-content a.ai-link`); nllinks = document.querySelectorAll(`#${c.id} .entry-content a.ntrl-link`); deslinks = document.querySelectorAll(`#${c.id} .entry-content a.des-link`); tdlinks = document.querySelectorAll(`#${c.id} .entry-content a.td-link`); iaslinks = document.querySelectorAll(`#${c.id} .entry-content a.ias-link`); mlclinks = document.querySelectorAll(`#${c.id} .entry-content a.mlc-link`); llinks = document.querySelectorAll(`#${c.id} .entry-content a.l-link`); pbplinks = document.querySelectorAll(`#${c.id} .entry-content a.pbp-link`); //sending the anchor tags list of each article one by one to remove extra anchor tags removeLinks(dslinks); removeLinks(mllinks); removeLinks(ailinks); removeLinks(nllinks); removeLinks(deslinks); removeLinks(tdlinks); removeLinks(iaslinks); removeLinks(mlclinks); removeLinks(llinks); removeLinks(pbplinks); } }); } //To remove extra achor tags of each category (ds, ml, ai) and only have 2 of each category per article cleanLinks(); */ //Recommended Articles var ctaLinks = [ /* ' ' + '

Subscribe to our AI newsletter!

' + */ '

Take our 85+ lesson From Beginner to Advanced LLM Developer Certification: From choosing a project to deploying a working product this is the most comprehensive and practical LLM course out there!

'+ '

Towards AI has published Building LLMs for Production—our 470+ page guide to mastering LLMs with practical projects and expert insights!

' + '
' + '' + '' + '

Note: Content contains the views of the contributing authors and not Towards AI.
Disclosure: This website may contain sponsored content and affiliate links.

' + 'Discover Your Dream AI Career at Towards AI Jobs' + '

Towards AI has built a jobs board tailored specifically to Machine Learning and Data Science Jobs and Skills. Our software searches for live AI jobs each hour, labels and categorises them and makes them easily searchable. Explore over 10,000 live jobs today with Towards AI Jobs!

' + '
' + '

🔥 Recommended Articles 🔥

' + 'Why Become an LLM Developer? Launching Towards AI’s New One-Stop Conversion Course'+ 'Testing Launchpad.sh: A Container-based GPU Cloud for Inference and Fine-tuning'+ 'The Top 13 AI-Powered CRM Platforms
' + 'Top 11 AI Call Center Software for 2024
' + 'Learn Prompting 101—Prompt Engineering Course
' + 'Explore Leading Cloud Providers for GPU-Powered LLM Training
' + 'Best AI Communities for Artificial Intelligence Enthusiasts
' + 'Best Workstations for Deep Learning
' + 'Best Laptops for Deep Learning
' + 'Best Machine Learning Books
' + 'Machine Learning Algorithms
' + 'Neural Networks Tutorial
' + 'Best Public Datasets for Machine Learning
' + 'Neural Network Types
' + 'NLP Tutorial
' + 'Best Data Science Books
' + 'Monte Carlo Simulation Tutorial
' + 'Recommender System Tutorial
' + 'Linear Algebra for Deep Learning Tutorial
' + 'Google Colab Introduction
' + 'Decision Trees in Machine Learning
' + 'Principal Component Analysis (PCA) Tutorial
' + 'Linear Regression from Zero to Hero
'+ '

', /* + '

Join thousands of data leaders on the AI newsletter. It’s free, we don’t spam, and we never share your email address. Keep up to date with the latest work in AI. From research to projects and ideas. If you are building an AI startup, an AI-related product, or a service, we invite you to consider becoming a sponsor.

',*/ ]; var replaceText = { '': '', '': '', '
': '
' + ctaLinks + '
', }; Object.keys(replaceText).forEach((txtorig) => { //txtorig is the key in replacetext object const txtnew = replaceText[txtorig]; //txtnew is the value of the key in replacetext object let entryFooter = document.querySelector('article .entry-footer'); if (document.querySelectorAll('.single-post').length > 0) { //console.log('Article found.'); const text = entryFooter.innerHTML; entryFooter.innerHTML = text.replace(txtorig, txtnew); } else { // console.log('Article not found.'); //removing comment 09/04/24 } }); var css = document.createElement('style'); css.type = 'text/css'; css.innerHTML = '.post-tags { display:none !important } .article-cta a { font-size: 18px; }'; document.body.appendChild(css); //Extra //This function adds some accessibility needs to the site. function addAlly() { // In this function JQuery is replaced with vanilla javascript functions const imgCont = document.querySelector('.uw-imgcont'); imgCont.setAttribute('aria-label', 'AI news, latest developments'); imgCont.title = 'AI news, latest developments'; imgCont.rel = 'noopener'; document.querySelector('.page-mobile-menu-logo a').title = 'Towards AI Home'; document.querySelector('a.social-link').rel = 'noopener'; document.querySelector('a.uw-text').rel = 'noopener'; document.querySelector('a.uw-w-branding').rel = 'noopener'; document.querySelector('.blog h2.heading').innerHTML = 'Publication'; const popupSearch = document.querySelector$('a.btn-open-popup-search'); popupSearch.setAttribute('role', 'button'); popupSearch.title = 'Search'; const searchClose = document.querySelector('a.popup-search-close'); searchClose.setAttribute('role', 'button'); searchClose.title = 'Close search page'; // document // .querySelector('a.btn-open-popup-search') // .setAttribute( // 'href', // 'https://medium.com/towards-artificial-intelligence/search' // ); } // Add external attributes to 302 sticky and editorial links function extLink() { // Sticky 302 links, this fuction opens the link we send to Medium on a new tab and adds a "noopener" rel to them var stickyLinks = document.querySelectorAll('.grid-item.sticky a'); for (var i = 0; i < stickyLinks.length; i++) { /* stickyLinks[i].setAttribute('target', '_blank'); stickyLinks[i].setAttribute('rel', 'noopener'); */ } // Editorial 302 links, same here var editLinks = document.querySelectorAll( '.grid-item.category-editorial a' ); for (var i = 0; i < editLinks.length; i++) { editLinks[i].setAttribute('target', '_blank'); editLinks[i].setAttribute('rel', 'noopener'); } } // Add current year to copyright notices document.getElementById( 'js-current-year' ).textContent = new Date().getFullYear(); // Call functions after page load extLink(); //addAlly(); setTimeout(function() { //addAlly(); //ideally we should only need to run it once ↑ }, 5000); }; function closeCookieDialog (){ document.getElementById("cookie-consent").style.display = "none"; return false; } setTimeout ( function () { closeCookieDialog(); }, 15000); console.log(`%c 🚀🚀🚀 ███ █████ ███████ █████████ ███████████ █████████████ ███████████████ ███████ ███████ ███████ ┌───────────────────────────────────────────────────────────────────┐ │ │ │ Towards AI is looking for contributors! │ │ Join us in creating awesome AI content. │ │ Let's build the future of AI together → │ │ https://towardsai.net/contribute │ │ │ └───────────────────────────────────────────────────────────────────┘ `, `background: ; color: #00adff; font-size: large`); //Remove latest category across site document.querySelectorAll('a[rel="category tag"]').forEach(function(el) { if (el.textContent.trim() === 'Latest') { // Remove the two consecutive spaces (  ) if (el.nextSibling && el.nextSibling.nodeValue.includes('\u00A0\u00A0')) { el.nextSibling.nodeValue = ''; // Remove the spaces } el.style.display = 'none'; // Hide the element } }); // Add cross-domain measurement, anonymize IPs 'use strict'; //var ga = gtag; ga('config', 'G-9D3HKKFV1Q', 'auto', { /*'allowLinker': true,*/ 'anonymize_ip': true/*, 'linker': { 'domains': [ 'medium.com/towards-artificial-intelligence', 'datasets.towardsai.net', 'rss.towardsai.net', 'feed.towardsai.net', 'contribute.towardsai.net', 'members.towardsai.net', 'pub.towardsai.net', 'news.towardsai.net' ] } */ }); ga('send', 'pageview'); -->