Blog

Share

ETL Pipelines with Airflow: the Good, the Bad and the Ugly

Ari Bajo Rouvinen
ETL Pipelines with Airflow: the Good, the Bad and the Ugly

Airflow is a popular open-source workflow management platform. Many data teams also use Airflow for their ETL pipelines. For example, I’ve previously used Airflow transfer operators to replicate data between databases, data lakes and data warehouses. I’ve also used Airflow transformation operators to preprocess data for machine learning algorithms. But is using Airflow for your ETL pipelines a good practice today?

In this article, we review how to use Airflow ETL operators to transfer data from Postgres to BigQuery with the ETL and ELT paradigms. Then, we share some challenges you may encounter when attempting to load data incrementally with Airflow DAGs. Finally, we argue why Airflow ETL operators won’t be able to cover the long tail of integrations for your business data.

The suggested alternative is to keep using Airflow to schedule and monitor ELT pipelines, but use other open-source projects that are better suited for the extract, load and transform steps. Notably, using Airbyte for the extract and load steps and dbt for the transformation step. The good news is that you can easily integrate Airflow with Airbyte and dbt.

🙂 The Good

Airflow Operators for ETL pipelines

Imagine you have your application data stored in a Postgres database. The marketing, sales and product teams have their data stored on third-party applications such as Google Ads, Salesforce and Segment. From this point, you may want to centralize all your business data in a single data warehouse, such as Google BigQuery.

Your company is already using Airflow, so you start searching for Airflow ETL operators to extract, transform and load data within the built-in operators and provider packages. When you find no operator to interact with your data you may search for Airflow Hooks to connect with an external system. For example, there is no operator to retrieve data from Segment, but you can use the SegmentHook to interact with Segment’s API.

At first, you may be tempted to build an ETL pipeline where you extract your data from Postgres to a file storage, transform the data locally with the PythonOperator and then load the transformed data to BigQuery. You end up creating the following example of an Airflow ETL DAG.

PostgresToGCSOperator (E) -> GCSToLocalFilesystemOperator (E) -> PythonOperator (T) -> LocalFilesystemToGCSOperator (L) -> GCSToBigQueryOperator (L)

But is this a good practice today?

This follows the traditional ETL pipeline architecture where the transformation logic happens between the extract and load steps. With Airflow you can use operators to transform data locally (PythonOperator, BashOperator...), remotely (SparkSubmitOperator, KubernetesPodOperator…) or in a data store (PostgresOperator, BigQueryInsertJobOperator...).

Over the last few years, many data teams have migrated their ETL pipelines to follow the ELT paradigm. With the ELT paradigm, raw data is first loaded into the data warehouse and then transformed with SQL in the data warehouse. One of the main issues of ETL pipelines is that they transform data in transit, so they break easier. 

On the other hand, today’s data warehouses auto scale to handle any computation and volume needs. Modern data transformation tools like dbt have given SQL superpowers. With dbt you can use macros, for loops and more in your SQL files. On top of that, dbt handles dependencies between tables based on references on the SQL code. When you transform data with Airflow you need to duplicate the dependencies between tables both in your SQL files and in your DAG.

SQL is taking over Python to transform and analyze data in the modern data stack

Airflow Operators for ELT Pipelines

You can use Airflow transfer operators together with database operators to build ELT pipelines. There is no Airflow operator to transfer data directly from Postgres to BigQuery so you need to use a staging storage in Google Cloud. For example, you can use the PostgresToGCSOperator followed by the GCSToBigQueryOperator. Then, you can apply transformations in the destination with BigQuery operators.

PostgresToGCSOperator (E)->  GCSToBigQueryOperator (L) -> BigQueryInsertJobOperator (T)

Airflow provides a vast number of choices to move data from one system to another. This can be ok if your data engineering team is proficient with Airflow and knows the best practices around data integration. For a less experienced data engineering team or a data analyst team, Airflow can be overwhelming as there are many choices to make when building a data pipeline.

Airflow is not an opinionated ETL tool.

😕 The Bad

As we have seen, doing a full data replication with Airflow is easy. But what happens when your schema on the data source changes and you try to rerun the full replication DAG? It will fail. To be able to rerun the DAG, you need to add an extra task to delete the data in the destination before trying to replicate the data again. This also ensures that your DAGs are idempotent and you can rerun them without any side effects when they fail. Next, we will see how it gets more complicated when you want to create an Airflow DAG to load data incrementally.

Incremental Data Load with Airflow

If you want to load data incrementally with Airflow, you first need to decide on the DAG frequency. Then you need to make sure that each DAG run only loads data that was created between the DAG execution date and the next DAG execution date. For that, you have to modify the SQL files used by the PostgresToGCSOperator to include Airflow macros `{{ds}}` for the DAG execution date and `{{next_ds}}` for the next execution date. The values are injected by Airflow during runtime.

Another issue that you may run into, is that you may need a different DAG for full data replication and incremental loads if your DAG frequency is high. Imagine that the ideal frequency of your incremental load is hourly and you have business data for the last five years. That means that if you want to reload data from the beginning, Airflow will trigger 43800 (5 * 365 * 24) Airflow DAG runs which can be slow as Airflow introduces a significant latency between two DAG runs. To solve this problem, many data engineering teams end up creating monthly, daily and hourly DAGs with nearly the same code.

As we have seen, there is no convention over how to set up a full data replication and an incremental load for each transfer operator. On top of that, each transfer operator has a different interface, sync modes have different names and how you map types from source to destination varies from one operator to another. Ideally, with an ETL tool you would expect to only select the source, destination, sync mode and the database column(s) to look for new data.

Other open-source data integration tools like Airbyte have a set of common options for all data transfers. For example, Airbyte supports four modes to sync data: full refresh overwrite, full refresh append, incremental append and incremental deduped history.

😟 The Ugly

Airflow Operator Sources and Destinations are Tightly Coupled

Each cloud provider includes operators to transfer data for the most popular systems in their cloud. In the providers packages documentation you can find a list of operators and hooks that are released independently of the Airflow core. You can navigate all 60 transfer operators in a Google Sheet organized by source and destination. Below you can see a Chord diagram with all possible data flows using Airflow transfer operators. 


Visualizing all data flows using Airflow transfer operators. Credits to the Datasmith Chord Diagram Generator.


Notice that the graph above doesn’t show the direction of a data transfer. Only in some cases, you can move data back and forth like with the GCSToS3Operator and S3ToGCSOperator operators. Sometimes you can move data within the same system, like with the GCSToGCSOperator. Now imagine that you want to replicate Google Ads data to Snowflake. You have to use the GoogleAdsToGcsOperator, followed by the GCSToS3Operator and the S3ToSnowflakeOperator.

The main issue with Airflow transfer operators is that if you want to support transfers from M sources to N destinations, the community needs to support N x M Airflow operators. This is why Airflow data transfer operators are focused on a limited number of databases, data warehouses and data lakes. But what happens when your company needs to synchronize data from business applications? There are no Airflow operators to transfer data from business applications other than the Salesforce, Google Ads and Facebook Ads operators.

Other data integration tools like Airbyte have decoupled sources from destinations with connectors, so that the community only needs to code 2 * (N + M) connectors to support all possible data transfers. After only one year, Airbyte already supports hundreds of connectors and more than 1000 Airbyte data transfers vs 60 supported data transfers with Airflow.

Airflow transfer operators won’t be able to cover the long tail of integrations

🤩 The Alternative

ELT Pipelines with Airflow, Airbyte and dbt

Airflow, Airbyte and dbt are three open-source projects with a different focus but lots of overlapping features. Originally, Airflow is a workflow management tool, Airbyte a data integration (EL steps) tool and dbt is a transformation (T step) tool.

As we have seen, you can also use Airflow to build ETL and ELT pipelines. The feature overlapping doesn’t stop here, it also works the other way around. Airbyte Open-Source  and Airbyte Cloud also provide a scheduler and integrate with dbt for the T step. dbt Cloud also comes with a scheduler.


Airbyte Connection UI to replicate data from Postgres to BigQuery and apply custom dbt transformations.

Where to Schedule ELT Pipelines?

If you are a data company and have different kinds of pipelines, you can use Airflow to schedule all your ELT pipelines. You can then integrate Airflow with Airbyte for the EL steps with the AirbyteTriggerSyncOperator. This way, you can trigger incremental syncs with the Airflow scheduler and run full refresh syncs from the Airbyte UI without the latency you would have when doing an Airflow backfill job.

You can also integrate Airbyte with dbt to run transformations for each single connection that loads data to destination. Keep in mind that you should limit Airbyte usage of dbt to simple data normalization for each loaded table. If you need more complex data transformation logic that combines data from different dbt models, you can integrate Airflow with dbt instead.

Unfortunately, the community has not yet agreed about the best way to integrate Airflow and dbt. Some use the BashOperator, others the airflow-dbt integration package contributed by the GoCardless team and a few others use the dbt-cloud-plugin. The engineering team at Updater has shared their work recreating the dbt graph in Airflow by parsing the dbt manifest.json file.


Airflow DAG parsed from the dbt manifest.json file. Credits to the Updater and Astronomer.io teams.

Conclusion

Airflow shines as a workflow orchestrator. Because Airflow is widely adopted, many data teams also use Airflow transfer and transformation operators to schedule and author their ETL pipelines. Several of those data teams have migrated their ETL pipelines to follow the ELT paradigm. We have seen some of the challenges of building full data replication and incremental loads DAGs with Airflow. More troublesome is that sources and destinations are tightly coupled in Airflow transfer operators. Because of this, it will be hard for Airflow to cover the long-tail of integrations for your business applications.

One alternative is to keep using Airflow as a scheduler and integrate it with two other open-source projects that are better suited for ELT pipelines, Airbyte for the EL parts and dbt for the T part. Airbyte sources are decoupled from destinations so you can already sync data from 100+ sources (databases,  APIs, ...) to 10+ destinations (databases, data warehouses, data lakes, ...) and remove boilerplate code needed with Airflow. With dbt you can transform data with SQL in your data warehouse and avoid having to handle dependencies between tables in your Airflow DAGs.

If you would like to get involved, we hope you’ll join Airbyte’s Slack community, the most active community around data integration!

Getting started is easy

Start breaking your data siloes with Airbyte.

Want to follow us?

Subscribe to get updates from our community and on our progress on making data integration a commodity!

Continue reading