Skip to main content

How I Built My First ETL Pipeline with Apache Airflow

How I Built My First ETL Pipeline with Apache Airflow

How I Built My First ETL Pipeline with Apache Airflow

Did you know that 90 % of data‑driven companies report at least one major data‑pipeline failure each quarter? I hit that wall on my very first try—until I discovered Apache Airflow. In this post I’ll walk you through the exact steps I took to turn a chaotic collection of scripts into a reliable, repeatable ETL workflow that now runs on autopilot.

Why a Proper ETL Pipeline Matters

Business impact of broken data pipelines is a real pain—lost revenue, bad decisions, and a reputation that can spiral downwards. In my experience, the first time a script goes rogue, the entire data team feels the sting. Ad‑hoc scripts are fine for one‑off reports, but they lack scheduling, retry logic, and visibility. Airflow's Directed Acyclic Graph (DAG) model solves these snags by visualizing dependencies, orchestrating retries, and giving instant alerts. And what I love about Airflow is that it plays nicely with existing tools, so I never had to abandon my favorite libraries.

Planning the Pipeline – From Source to Destination

- Identify data sources: MySQL for transactional data, a third‑party API for external metrics. - Define transformation needs: clean nulls, enrich with geo‑data, aggregate sales by region. - Pick complementary tools: dbt for SQL‑level transforms, Spark for heavy aggregation. - Map out the data flow: extract → load → transform → aggregate → warehouse. - Decide on execution context: Docker‑Compose for dev, Kubernetes for prod. In the past few months, I realized that a clear blueprint saves me from chasing bugs later. That’s why I drafted a one‑page diagram before writing a single line of code.

Step‑by‑Step Walkthrough – Building the Airflow DAG

The code below is a minimal yet complete Airflow DAG that extracts data from MySQL, writes it to S3, runs a dbt model, and finally triggers a Spark job for aggregation. I’ll walk through each part so you can tweak it for your own stack.
# file: dags/first_etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

default_args = {
    "owner": "data-eng",
    "depends_on_past": False,
    "email": ["alerts@example.com"],
    "email_on_failure": True,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="first_etl_pipeline",
    default_args=default_args,
    description="My first ETL pipeline using Airflow, dbt and Spark",
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    # 1️⃣ Extract: pull raw rows from MySQL
    extract_mysql = MySqlOperator(
        task_id="extract_mysql",
        sql="SELECT * FROM raw.sales WHERE sale_date = '{{ ds }}';",
        mysql_conn_id="mysql_source",
        database="sales_db",
    )

    # 2️⃣ Load: write the result to S3 as a CSV
    load_to_s3 = S3CreateObjectOperator(
        task_id="load_to_s3",
        s3_bucket="my-data-lake",
        s3_key="raw/sales_{{ ds }}.csv",
        data="{{ ti.xcom_pull(task_ids='extract_mysql') }}",
        aws_conn_id="aws_default",
    )

    # 3️⃣ Transform with dbt
    dbt_transform = BashOperator(
        task_id="dbt_transform",
        bash_command=(
            "cd /opt/airflow/dbt_project && "
            "dbt run --profiles-dir . --target prod "
            "--vars '{date: \"{{ ds }}\"}'"
        ),
    )

    # 4️⃣ Aggregate with Spark
    spark_agg = SparkSubmitOperator(
        task_id="spark_aggregate",
        application="/opt/airflow/spark/jobs/aggregate_sales.py",
        name="sales_agg_{{ ds }}",
        conf={"spark.yarn.submit.waitAppCompletion": "false"},
        jars="/opt/airflow/jars/spark-sql_2.12-3.2.1.jar",
        application_args=["--date", "{{ ds }}"],
    )

    extract_mysql >> load_to_s3 >> dbt_transform >> spark_agg
*Look, this is the core of the pipeline. Each operator does a clear piece of work, and the arrows show the flow.* The XCom pulls data between tasks, Jinja templating injects the run date, and Airflow Connections keep credentials out of the code.

Testing, Monitoring & Scaling the Pipeline

- Unit‑testing tasks with `pytest` and Airflow’s `airflow dags test`. - Add alerting: Slack webhook, email on failure. - Scale out: switch to CeleryExecutor, use KubernetesPodOperator for Spark jobs. - Keep an eye on the Airflow UI; the Graph view turns a mystery into a clear map. - Use Airflow's built‑in SLA feature to flag slow tasks. Sound familiar? That’s the scenario I dealt with last month when a nightly job started pulling data at 3 AM and tripped a downstream alert. By adding a SimpleSensor that waited for the S3 file, the downstream tasks no longer ran on stale data.

Actionable Takeaways & Next Steps

1. **Checklist for “pipeline‑ready” code** - Use Airflow Connections for secrets. - Separate extraction, transformation, aggregation into distinct operators. - Keep DAG files under version control. 2. **Version‑control DAGs and dbt models** - Git repo, feature branches, pull requests. - CI pipeline runs `airflow dags test` and `dbt test`. 3. **Suggested improvements** - Incremental loads with dbt's `materialized: incremental`. - Data quality checks via dbt's built‑in tests. - Continuous delivery: deploy DAGs to prod via GitHub Actions. Now that you have the skeleton, it's time to fill in your own data sources, tweak the SQL, and let Airflow do the heavy lifting.

Frequently Asked Questions

What is the difference between ETL and ELT, and does Airflow support both?

ETL extracts data, transforms it, then loads it into the target; ELT loads raw data first and transforms it inside the warehouse. Airflow is agnostic—it can orchestrate either pattern by scheduling the appropriate tasks (e.g., Spark for heavy transforms in ELT).

How do I integrate dbt models into an Airflow DAG?

Use the BashOperator (or the official DbtRunOperator from the astronomer‑provider) to call dbt run inside the DAG. Pass the appropriate profile and target so dbt runs against the same warehouse your Airflow tasks use.

Can I run Spark jobs from Airflow without a full Spark cluster?

Yes. For development you can launch Spark locally via spark-submit in a Docker container; for production you can point the SparkSubmitOperator at a managed cluster (EMR, Dataproc, or Kubernetes).

What are the best practices for handling secrets (DB passwords, API keys) in Airflow?

Store secrets in a backend like HashiCorp Vault, AWS Secrets Manager, or Airflow’s built‑in Connections UI, and reference them via Jinja templating ({{ conn.my_db.password }}). Never hard‑code credentials in DAG files.

How do I version‑control my Airflow DAGs and ensure they don’t break production?

Keep DAGs in a Git repo, use feature branches for changes, and set up a CI pipeline (GitHub Actions, GitLab CI) that runs airflow dags test and linting (flake8, pylint) before merging. Deploy to production via a CI/CD step that copies the vetted DAGs to the Airflow workers.


Related reading: Original discussion

Related Articles

What do you think?

Have experience with this topic? Drop your thoughts in the comments - I read every single one and love hearing different perspectives!

Comments

Popular posts from this blog

2026 Update: Getting Started with SQL & Databases: A Comp...

Low-Code Isn't Stealing Dev Jobs — It's Changing Them (And That's a Good Thing) Have you noticed how many non-tech folks are building Mission-critical apps lately? Honestly, it's kinda wild — marketing tres creating lead-gen tools, ops managers deploying inventory systems. Sound familiar? But here's the deal: it's not magic, it's low-code development platforms reshaping who gets to play the app-building game. What's With This Low-Code Thing Anyway? So let's break it down. Low-code platforms are visual playgrounds where you drag pre-built components instead of hand-coding everything. Think LEGO blocks for software – connect APIs, design interfaces, and automate workflows with minimal typing. Citizen developers (non-IT pros solving their own problems) are loving it because they don't need a PhD in Java. Recently, platforms like OutSystems and Mendix have exploded because honestly? Everyone needs custom tools faster than traditional codin...

Practical Guide: Getting Started with Data Science: A Com...

Laravel 11 Unpacked: What's New and Why It Matters Still running Laravel 10? Honestly, you might be missing out on some serious upgrades. Let's break down what Laravel 11 brings to the table – and whether it's worth the hype for your PHP framework projects. Because when it comes down to it, staying current can save you headaches later. What's Cooking in Laravel 11? Laravel 11 streamlines things right out of the gate. Gone are the cluttered config files – now you get a leaner, more focused starting point. That means less boilerplate and more actual coding. And here's the kicker: they've baked health routing directly into the framework. So instead of third-party packages for uptime monitoring, you've got built-in /up endpoints. But the real showstopper? Per-second API rate limiting. Remember those clunky custom solutions for throttling requests? Now you can just do: RateLimiter::for('api', function (Request $ 💬 What do you think?...

Expert Tips: Getting Started with Data Tools & ETL: A Com...

{"text":""} 💬 What do you think? Have you tried any of these approaches? I'd love to hear about your experience in the comments!