How I Built My First ETL Pipeline with Apache Airflow
Did you know that 90 % of data‑driven companies report at least one major data‑pipeline failure each quarter? I hit that wall on my very first try—until I discovered Apache Airflow. In this post I’ll walk you through the exact steps I took to turn a chaotic collection of scripts into a reliable, repeatable ETL workflow that now runs on autopilot.Why a Proper ETL Pipeline Matters
Business impact of broken data pipelines is a real pain—lost revenue, bad decisions, and a reputation that can spiral downwards. In my experience, the first time a script goes rogue, the entire data team feels the sting. Ad‑hoc scripts are fine for one‑off reports, but they lack scheduling, retry logic, and visibility. Airflow's Directed Acyclic Graph (DAG) model solves these snags by visualizing dependencies, orchestrating retries, and giving instant alerts. And what I love about Airflow is that it plays nicely with existing tools, so I never had to abandon my favorite libraries.Planning the Pipeline – From Source to Destination
- Identify data sources: MySQL for transactional data, a third‑party API for external metrics. - Define transformation needs: clean nulls, enrich with geo‑data, aggregate sales by region. - Pick complementary tools: dbt for SQL‑level transforms, Spark for heavy aggregation. - Map out the data flow: extract → load → transform → aggregate → warehouse. - Decide on execution context: Docker‑Compose for dev, Kubernetes for prod. In the past few months, I realized that a clear blueprint saves me from chasing bugs later. That’s why I drafted a one‑page diagram before writing a single line of code.Step‑by‑Step Walkthrough – Building the Airflow DAG
The code below is a minimal yet complete Airflow DAG that extracts data from MySQL, writes it to S3, runs a dbt model, and finally triggers a Spark job for aggregation. I’ll walk through each part so you can tweak it for your own stack.# file: dags/first_etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
default_args = {
"owner": "data-eng",
"depends_on_past": False,
"email": ["alerts@example.com"],
"email_on_failure": True,
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="first_etl_pipeline",
default_args=default_args,
description="My first ETL pipeline using Airflow, dbt and Spark",
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# 1️⃣ Extract: pull raw rows from MySQL
extract_mysql = MySqlOperator(
task_id="extract_mysql",
sql="SELECT * FROM raw.sales WHERE sale_date = '{{ ds }}';",
mysql_conn_id="mysql_source",
database="sales_db",
)
# 2️⃣ Load: write the result to S3 as a CSV
load_to_s3 = S3CreateObjectOperator(
task_id="load_to_s3",
s3_bucket="my-data-lake",
s3_key="raw/sales_{{ ds }}.csv",
data="{{ ti.xcom_pull(task_ids='extract_mysql') }}",
aws_conn_id="aws_default",
)
# 3️⃣ Transform with dbt
dbt_transform = BashOperator(
task_id="dbt_transform",
bash_command=(
"cd /opt/airflow/dbt_project && "
"dbt run --profiles-dir . --target prod "
"--vars '{date: \"{{ ds }}\"}'"
),
)
# 4️⃣ Aggregate with Spark
spark_agg = SparkSubmitOperator(
task_id="spark_aggregate",
application="/opt/airflow/spark/jobs/aggregate_sales.py",
name="sales_agg_{{ ds }}",
conf={"spark.yarn.submit.waitAppCompletion": "false"},
jars="/opt/airflow/jars/spark-sql_2.12-3.2.1.jar",
application_args=["--date", "{{ ds }}"],
)
extract_mysql >> load_to_s3 >> dbt_transform >> spark_agg
*Look, this is the core of the pipeline. Each operator does a clear piece of work, and the arrows show the flow.*
The XCom pulls data between tasks, Jinja templating injects the run date, and Airflow Connections keep credentials out of the code.
Testing, Monitoring & Scaling the Pipeline
- Unit‑testing tasks with `pytest` and Airflow’s `airflow dags test`. - Add alerting: Slack webhook, email on failure. - Scale out: switch to CeleryExecutor, use KubernetesPodOperator for Spark jobs. - Keep an eye on the Airflow UI; the Graph view turns a mystery into a clear map. - Use Airflow's built‑in SLA feature to flag slow tasks. Sound familiar? That’s the scenario I dealt with last month when a nightly job started pulling data at 3 AM and tripped a downstream alert. By adding a SimpleSensor that waited for the S3 file, the downstream tasks no longer ran on stale data.Actionable Takeaways & Next Steps
1. **Checklist for “pipeline‑ready” code** - Use Airflow Connections for secrets. - Separate extraction, transformation, aggregation into distinct operators. - Keep DAG files under version control. 2. **Version‑control DAGs and dbt models** - Git repo, feature branches, pull requests. - CI pipeline runs `airflow dags test` and `dbt test`. 3. **Suggested improvements** - Incremental loads with dbt's `materialized: incremental`. - Data quality checks via dbt's built‑in tests. - Continuous delivery: deploy DAGs to prod via GitHub Actions. Now that you have the skeleton, it's time to fill in your own data sources, tweak the SQL, and let Airflow do the heavy lifting.Frequently Asked Questions
What is the difference between ETL and ELT, and does Airflow support both?
ETL extracts data, transforms it, then loads it into the target; ELT loads raw data first and transforms it inside the warehouse. Airflow is agnostic—it can orchestrate either pattern by scheduling the appropriate tasks (e.g., Spark for heavy transforms in ELT).
How do I integrate dbt models into an Airflow DAG?
Use the BashOperator (or the official DbtRunOperator from the astronomer‑provider) to call dbt run inside the DAG. Pass the appropriate profile and target so dbt runs against the same warehouse your Airflow tasks use.
Can I run Spark jobs from Airflow without a full Spark cluster?
Yes. For development you can launch Spark locally via spark-submit in a Docker container; for production you can point the SparkSubmitOperator at a managed cluster (EMR, Dataproc, or Kubernetes).
What are the best practices for handling secrets (DB passwords, API keys) in Airflow?
Store secrets in a backend like HashiCorp Vault, AWS Secrets Manager, or Airflow’s built‑in Connections UI, and reference them via Jinja templating ({{ conn.my_db.password }}). Never hard‑code credentials in DAG files.
How do I version‑control my Airflow DAGs and ensure they don’t break production?
Keep DAGs in a Git repo, use feature branches for changes, and set up a CI pipeline (GitHub Actions, GitLab CI) that runs airflow dags test and linting (flake8, pylint) before merging. Deploy to production via a CI/CD step that copies the vetted DAGs to the Airflow workers.
Related reading: Original discussion
Related Articles
What do you think?
Have experience with this topic? Drop your thoughts in the comments - I read every single one and love hearing different perspectives!
Comments
Post a Comment