What is Apache Airflow?
Overview, tips, and tricks for this open-source workflow management platform
Apache Airflow is an open-source workflow management platform that is particularly useful when constructing complex data pipelines. It’s an elegant solution to many of the limitations of traditional job schedulers like Cron and Oozie.
Airflow workflows are defined as Python-based code; everything from configuration schedules to the corresponding scripts to run them. It embodies a successful implementation of the “workflow-as-code” paradigm, eliminating the need to use JSON and XML files for configuration settings. Since the entire workflow is defined in code, it’s also easy to interface with other third-party APIs or databases.
What sets Airflow apart from other job schedulers is its powerful web UI. By creating data pipelines in Airflow, code is converted into precise graphical representations to visualize all workflow dependencies, run instances, and quickly identify possible areas of failure.
Additionally, Airflow is horizontally scalable with its extensible architecture and message queuing system to manage any number of worker nodes. According to the Airflow’s documentation, it is “ready to scale to infinity.”
Airflow was started at Airbnb in 2015 to manage its data-based operations with fast-growing data better. Airflow was sourced under Apache Certification in 2016 and is now considered an Apache Top-Level Project.
Airflow has become an increasingly popular tool for data management teams and data engineers. Today, about 230 companies are using Airflow to manage their daily operations. Within the Airflow community, some of the more prominent companies using and consistently contributing to Airflow include Spotify, Twitter, Reddit, PayPal, Lyft, and of course, Airbnb.
Before starting with Airflow, it’s a good idea to get familiar with the basic concepts that enable you to create and structure your workflows.
Workflows in Airflow are designed as directed acyclic graphs (DAGs), representing a generic outline that defines sets of tasks with directional dependencies and the order in which they should be executed. Nodes in the graph represent the individual tasks, and the edges connecting the nodes represent dependencies between tasks. The workflow graph is forced to be acyclic, so there are no circular dependencies that could lead to infinite loops.
DAGs are configured as code that exists in a single DAG file. A DAG file is used only to organize sets of tasks. It does not contain any data processing logic or computation.
When a DAG is instantiated by the scheduler or triggered from the web UI, it spawns a DAG run, which represents a single instance of a DAG at a point in time with a unique execution date.
While a DAG describes the structure of how tasks will be run, operators determine what gets executed in your workflow.
An operator should be atomic. This means, it either fails or succeeds entirely without having to share resources with any other operators within a DAG. Airflow comes with several built-in operators for almost every possible type of task execution. Some prevalent operators include:
- BashOperator – executes a bash command
- PythonOperator – executes a Python callable
- SimpleHttpOperator – sends an HTTP request
- SqliteOperator – executes a SQL command
- MySqlOperator – executes a SQL command
- PostgresOperator – executes a SQL command
In addition to these “standard” operators, Airflow also offers some operators for more specific tasks:
- DockerOperator – executes a command within a Docker container
- HiveOperator – executes HQL code or hive script in a Hive database
- S3FileTransformOperator – copies data from S3 to local file system
- SlackAPIOperator – posts messages to a Slack channel
Airflow is capable of scheduling tasks of any type. However, if the built-in operators do not quite satisfy your environment needs, you can always create a custom operator by extending the BaseOperator class and implementing the execute()method for easy integration.
To execute an operator, a task is instantiated with a particular set of input arguments that becomes a node in the DAG. It can be easy to confuse operators with tasks. To be clear, tasks are the user-defined actions to be run by the operators in a DAG.
Tasks should be idempotent, meaning they should produce the same result, regardless of how many times they are executed.
When a DAG is triggered, all the tasks associated with that specific DAG run are referred to as task instances. A task instance is a task at a specific point in time. Each task instance is associated with an entry in the metadata database, where its indicative state is logged (i.e. “running”, “success”, “failed”, “queued”, “skipped”, “up for retry”). Updating the state of task instances plays a crucial role in Airflow’s scheduling and execution processes.
In short, a DAG is composed of workflow tasks, which are simply instances of operators. When a DAG is executed, a DAG run is created, containing all task instances for that particular run.
Figure 1. Tree view in the web UI to demonstrate Airflow concepts.
Now that we have a better understanding of the basic building blocks of Airflow, we will go through a simple example of writing a DAG file to demonstrate how all these concepts work together as well as tips, tricks, and best practices when using Airflow.
To start, we will create a Python script, my_simple_dag.py, in the dags folder of our Airflow home directory.
We import all the necessary libraries and then define two primary functions. The greet() function opens the greet.txt file and writes the current datetime as a new line, while the respond() function just returns a string.
Next, we will define a default_args dictionary containing all of our DAG-specific parameters.
The owner parameter refers to the Airflow user that the DAG belongs to.
The start_date indicates when the DAG will start executing this workflow. It is important to note that this can also be a date in the past. If that is the case, the scheduler will trigger a backfill of all DAG run instances that have not been run previously, starting with the start_date specified.
The concurrency parameter indicates the number of processes needed when running DAGs in parallel. For instance, our DAG’s start_date is the 27th of April, which is in the past, so our DAG will backfill to run all past instances. If we set our concurrency to 2, then our DAG will backfill two DAG runs at a time until all previous instances are caught up with the current date. However, our concurrency is set to 1 for simplicity, so our past DAG instances will backfill sequentially.
Then, the retries parameter indicates how many times to retry running a DAG if it is not executed successfully. If the number of retries is exhausted and the DAG run is not successfully completed, it will be marked as failed.
Now, we will create our DAG instance. All DAGs begin with some basic configuration parameters:
We pass the string my_simple_dag as our dag_id, which will serve as a unique identifier for this particular DAG. We also pass in the default_args dictionary we defined above. Then finally, we define a schedule_interval of */10 * * * (every 10 minutes).
We have set our start_date in the default_args to be the 27th of April at 12:00:00 UTC, and we have already discussed Airflow’s ability to run past DAGs by backfilling. When we trigger our DAG, it will backfill for all DAG runs since this start_date has already passed. Once all past DAGs are run, the next one — the one we intended to run when triggering our DAG — will run at 12:10:00 UTC. Whatever schedule_interval you set when instantiating you DAG, DAG runs will run after that time. So, in our case our DAG will run once 10 minutes have passed.
If you want to avoid backfilling all together, you can either set your start_date to a date in the future or you can instantiate your DAG with the catchup parameter set to False.
After our DAG object is defined, we now set all of the tasks in our workflow.
The opr_hello operator executes a simple bash command that echoes the string Hi!! to the log console. The opr_greet operator calls the Python function greet() that we defined above to start this tutorial while opr_sleep executes another bash command to sleep after 5 seconds. And our last operator, opr_respond, calls the respond() Python function.
Finally, to finish off our DAG file, we set the order in which we want our tasks to be executed.
With task dependencies, it is sometimes easier to understand when looking at the Graph View in the Airflow web UI.
Here, we can see that the respond task will run in parallel and sleep will execute for both greet and respond tasks.
Tips, tricks & best practices
Airflow, like other data tools, has some “life hacks” that make it so much easier for you to use. If you’re not new to Airflow, you might already be familiar with some (or all) of these tricks. But if you are just starting, hopefully you can avoid the “I wish I had known this before” feeling with these tips and tricks.
1. Change dag_id when changing start_date
When you change the start_date of a DAG, a new entry is created in Airflow’s database resulting in two DAGs with the same dag_id but different schedules. This will lead to confusion when Airflow tries to schedule these DAGs.
To avoid this confusion, it is best practice to give your DAG a new dag_id when changing the start_date. Changing the dag_id does not delete the entry for the old dag_id. By assigning a new dag_id, a separate entry is created in the database, so Airflow can easily distinguish between the DAGs.
2. Default arguments
We did define default_arg parameters when writing our example DAG, but we did not go into much detail about why. Passing a dictionary of parameters that apply to all tasks in the DAG avoids having to repeat arguments. If many of your DAG tasks share parameters, using default_args prevents a lot of copy/paste action and keeps the code concise.
3. Instantiate DAGs with Context Manager
In our example, we did not use the Context Manager when constructing our DAG. Instead, we defined our DAG object and tasks separately. Although our DAG runs as expected, instantiating a DAG without the Context Manager leads to a lot of redundant code.
On the left, we have the original code from our example. On the right, we have the exact same code, but written with the Context Manager. By using the Context Manager, we eliminate the need of having to include dag = dag when defining all of our tasks and the risk of forgetting to associate a particular task with our DAG.
4. Task dependencies
There are many ways to set your DAG task dependencies in Airflow. For our example, we made use of .set_upstream() and .set_downstream().
Still, it is better to use one or the other for consistency.
Though this method is more consistent, there is a more concise way to define our task dependencies by using bitshift operators.
The relationship between task dependencies when using bitshift is set in the direction that the bitshift operator points.
We can even condense these dependencies further into a single line of code:
All of the methods listed above for defining task dependencies produce the same results. Still, it is good coding practice to keep your code as concise as possible for better readability.
We have discussed the benefits and building blocks of Airflow, written our DAG file, and summarized a few tips and tricks that can make using Airflow more efficient. Hopefully, this blog was helpful and provides a good starting point if you need to build your data workflows in the future!