September 18, 2024

Automating Workflows: How Apache Airflow can streamline Data Processes - Step by Step Guide

Apache Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology. A web interface helps manage the state of your workflows. Airflow is deployable in many ways, varying from a single process on your machine to a distributed setup to support even the biggest workflows.
Airflow is used to author workflows as Directed Acyclic Graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich CLI makes performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

Requirements:

Source

Main Version (Dev)

Stable Version (2.10.0)

Python

3.8, 3.9, 3.10, 3.11, 3.12

3.8, 3.9, 3.10, 3.11, 3.12

Platform

AMD64/ARM64(*)

AMD64/ARM64(*)

Kubernetes

1.28, 1.29, 1.30, 1.31

1.27, 1.28, 1.29, 1.30

PostgreSQL

12, 13, 14, 15, 16

12, 13, 14, 15, 16

MySQL

8.0, 8.4, Innovation

8.0, 8.4, Innovation

SQLite

3.15.0+

3.15.0+


Workflows as Code:

The main characteristic of Airflow workflows is that all workflows are defined in Python code. Workflows as code serve several purposes.

  1. Dynamic: Airflow pipelines are configured as Python code, allowing for dynamic pipeline generation.
  2. Extensible: The Airflow framework contains operators to connect with numerous technologies. All Airflow components are extensible to easily adjust to your environment.
  3. FlexibleWorkflow parameterization is built-in leveraging the Jinja templating engine.
  4. Elegant: Airflow pipelines are lean and explicit. Parameterizing your scripts is built into the core of Airflow using the powerful Jinja templating engine.
  5. Scalable: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers


Implementation sources:
Apache Airflow can be implemented using the below methods.
  1. PyPi
  2. Docker Images
  3. Docker Files
  4. Helm Charts
  5. Released sources
  6. Managed Airflow Services.
The recommended method for implementing, running, and installing Apache Airflow is to use a combination of Docker image/Dockerfile and Docker compose file. It provides the capability of running Airflow components in isolation from other software running on the same physical or virtual machines with easy maintenance of dependencies.

Installing Pre-requisites: 

Install Docker Community Edition (CE) on your workstation. Depending on your OS, you may need to configure Docker to use at least 4.00 GB of memory for the Airflow containers to run properly.

Install Docker Compose v2.14.0 or newer on your workstation.

Implementation Steps:
We will create a Docker compose YAML file for installing and deploying the Apache Airflow Docker container. This docker-compose will have the following service definitions

 - Airflow Scheduler which monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.
 - Airflow Webserver that provides a UI at specified IP:PORT URL.
 - Airflow Worker that executes the tasks given by the scheduler.
 - Airflow Triggerer that runs an event loop for deferrable tasks.
 - Airflow Init as the initialization service.
 - Postgres as the Database.
 - Redis is the broker that forwards the messages from the scheduler to the worker.


For this Docker compose, we will have mounted directories so that their contents are synchronized between the machine and the container. Directories would be running on user-based ownership which is generally for the user Airflow. 

 - DAGs Directory: The DAG files are stored here.
 - Logs Directory: It contains logs from task execution and scheduler.
 - Config Directory: This has the local configuration file airflow_local_settings.py for custom implementation.
 - Plugins Directory: Used to add custom plugins.


The crucial step before starting the implementation is to create the above directories and set the correct permissions for the Airflow user, as the Docker container will be running via Airflow user. Docker compose has a lot of services listed in it, but it’s important to note the UID and GID of the Airflow user on the machine to make sure the Docker compose has the correct values. 

mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

Now we run the Docker command to deploy the Airflow-Init service which initialize the Database and user login. The account created has the login airflow and the password airflow.

docker compose up airflow-init








Once this has been deployed, we need to clean up the environment for running all the services listed in the docker compose. 

Run the below command in the directory you downloaded the docker-compose YAML file.
Once removed, recreate the Docker compose using the guide and store it in the Directory. 

docker compose down --volumes --remove-orphans

Now we can start all services

docker compose up -d –build


The above implementation can be processed using a Dockerfile instead of a Docker Image which provides more customization using a requirements text file reference inside the Dockerfile. To do that, just Create a Dockerfile in the same directory where your docker-compose YAML file is stored. Place the requirements text file in the same directory as the Dockerfile and Docker compose YAML file. Run the command to deploy the containers using the Dockerfile instead of the image.

docker compose up -d –build

Once the cluster has started up, you can log in to the web interface and begin experimenting with DAGs.

Using Airflow:
Let’s look at the following snippet of code (DAG):


DAG Breakdown:

DAG Definition:

dag_id = dag_with_taskflow_api_v02
This is the unique identifier for the DAG.

default_args
Defines default parameters with retries (5 attempts) and delay between retries (5 minutes).

start_date = datetime(2024, 09, 04) 
The DAG can start running from this date. If the DAG is triggered after this date, any missed runs will be backfilled unless the catchup behavior is turned off.

Tasks Breakdown:

get_name()
Returns a dictionary containing a first name and last name: 
{'first_name': 'Smit', 'last_name': 'Shanischara'}.

get_age()
Returns a static integer, representing an age.

greet()
Prints a greeting message 

Task Execution Flow:

The DAG starts by executing get_name() and get_age() tasks in parallel since there is no dependency between them.
Once these two tasks are complete, their outputs are passed to the greet() task, which uses the extracted data to print a message.

Summarizing, This DAG demonstrates a simple workflow where data is "extracted" (name and age), combined in a task, and then "loaded" by printing a greeting message.

Airflow evaluates this script and executes the tasks at the set interval and in the defined order. The status of the DAG is visible in the web interface:





Just like the above DAG, we can have multiple DAGs running on a single Apache Airflow instance. 




Removing Airflow: 


To stop and delete containers, delete volumes with database data, and download images, run:
docker compose down --volumes --rmi all


Best Practices: 

 - Modular DAGs: Break large workflows into smaller, reusable DAGs. This promotes better maintainability and scalability.
 - Task Dependencies: Properly define task dependencies using set_upstream() and set_downstream() methods to ensure correct execution order.
 - Handle Failures: Use retries, on_failure_callback, and alerts to handle task failures gracefully.
 - Version Control: Keep your DAGs under version control (e.g., Git) for better collaboration and tracking changes.
 - Monitoring: Set up monitoring for failed tasks and performance bottlenecks using tools like Grafana or integrating with cloud monitoring solutions.
 - Security: Secure Airflow with role-based access control (RBAC) and authentication mechanisms.
 - Logging: Ensure logs are properly stored in a centralized system for debugging and compliance.

References: