Skip to main content

Building a Port Data Lake: Architecture, APIs & ETL...

Building a Port Data Lake: Architecture, APIs & ETL...

Building a Port Data Lake: Architecture, APIs & ETL Pipelines for TOS/ERP Integration

In 2023, 78 % of maritime logistics firms said a single data‑silod ERP system cost them an average of $1.2 M per year in lost efficiency. By turning that ERP into a port‑wide data lake, you can slash manual data handling by up to 85 % and unlock real‑time analytics that drive smarter vessel scheduling. Imagine a data engineer who no longer spends hours writing custom scripts for each TOS – instead, a single, reusable ETL pipeline feeds clean, searchable data to every downstream application.

Why a Port‑Centric Data Lake Matters

Fast turnaround times, lower demurrage, and cleaner compliance reports are all on the table when you have one source of truth. In my experience, the biggest win comes from ditching duplicated tables across TOS, ERP, and AIS feeds. A European hub cut vessel‑berth allocation time from 45 min to 7 min after lake adoption—pretty much a game‑changer for their operations.

Core Architecture Blueprint

  • Ingestion layer: API gateways, Kafka/Redpanda streams, and file‑drop zones for legacy CSVs.
  • Storage layer: Partitioned Parquet on S3/ADLS with Hive metastore for schema‑on‑read.
  • Processing layer: Spark Structured Streaming + dbt for transformation, Airflow for orchestration.

The beauty of this stack is that each component does exactly what it’s good at, letting you scale parts independently. For instance, you can spin up more Kafka brokers during peak season without touching the downstream Spark jobs.

Designing Robust APIs for TOS ↔ ERP Sync

So what's the catch when you expose TOS data to ERP? REST is great for CRUD, but gRPC can cut latency for vessel‑status pushes that happen every 30 seconds. I think gRPC shines when you need a lightweight, bi‑directional stream but keep a REST fallback for legacy components.

Authentication isn’t a afterthought. OAuth2 with short‑lived JWTs, coupled with network zoning, keeps your data safe from the open internet. And remember, maritime regulations often demand mutual TLS and IP whitelisting—don't overlook those little details.

Versioning strategy: stick to semantic versioning, but make the migration path backward‑compatible. That way downstream pipelines won't break when you add a new field to the vessel payload.

Hands‑On Walkthrough: Building an ETL Pipeline with Airflow, dbt & Spark

Below is a minimal but complete example that pulls vessel‑status JSON from a TOS REST endpoint, writes raw data to an S3 landing zone, processes it with Spark Structured Streaming into partitioned Parquet, and finally enriches the data with ERP contract information via dbt.

# airflow_dag.py
import datetime
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.amazon.aws.transfers.s3_to_s3 import S3ToS3Operator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.trigger_dbt_operator import DbtRunOperator

default_args = {
    'owner': 'etl_team',
    'retries': 3,
    'retry_delay': datetime.timedelta(minutes=5)
}

with DAG(
    dag_id='tos_to_lake_etl',
    default_args=default_args,
    schedule_interval='@hourly',
    start_date=datetime.datetime(2026, 1, 1),
    catchup=False
) as dag:

    fetch = SimpleHttpOperator(
        task_id='fetch_tos',
        http_conn_id='tos_api',
        endpoint='/vessels/status',
        method='GET',
        response_filter=lambda r: r.json(),
        log_response=True
    )

    store = S3ToS3Operator(
        task_id='store_raw',
        source_bucket='tos-raw',
        dest_bucket='lake-landing',
        dest_key='raw/{{ ds }}/vessels.json',
        overwrite=True
    )

    spark = SparkSubmitOperator(
        task_id='spark_etl',
        application='s3://scripts/spark_process.py',
        name='tos-parquet',
        conn_id='spark_cluster',
        conf={'spark.sql.shuffle.partitions': '200'}
    )

    dbt = DbtRunOperator(
        task_id='dbt_transform',
        dir='/opt/dbt/models',
        profile_name='lake'
    )

    fetch >> store >> spark >> dbt
# spark_process.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

spark = SparkSession.builder.appName("TOS ETL").getOrCreate()

raw = spark.readStream.json("s3://lake-landing/raw/*/vessels.json")

schema = StructType([
    StructField("vessel_id", StringType(), True),
    StructField("status", StringType(), True),
    StructField("timestamp", TimestampType(), True)
])

parsed = raw.selectExpr("cast(vessel_id as string) as vessel_id",
                       "cast(status as string) as status",
                       "cast(timestamp as timestamp) as timestamp")

query = parsed.writeStream \
    .format("parquet") \
    .option("path", "s3://lake-data/clean") \
    .partitionBy("year(timestamp)", "month(timestamp)") \
    .option("checkpointLocation", "s3://lake-data/checkpoints") \
    .start()

query.awaitTermination()
# models/vessel_enriched.sql
{{ config(materialized='incremental') }}

WITH raw AS (
    SELECT * FROM {{ source('lake', 'clean') }}
),

erp AS (
    SELECT
        contract_id,
        vessel_id,
        customer_name,
        contract_start,
        contract_end
    FROM {{ source('erp', 'contracts') }}
)

SELECT
    r.vessel_id,
    r.status,
    r.timestamp,
    e.contract_id,
    e.customer_name,
    e.contract_start,
    e.contract_end
FROM raw r
LEFT JOIN erp e
ON r.vessel_id = e.vessel_id
{% if is_incremental() %}
WHERE r.timestamp > (SELECT MAX(timestamp) FROM {{ this }})
{% endif %}

Validation & Alerting

Use Airflow sensors to check that the Spark job finished successfully. dbt tests (e.g., unique(vessel_id) and not_null(timestamp)) provide a safety net. When a test fails, push a message to Slack via a webhook or trigger PagerDuty escalation.

Actionable Takeaways & Next Steps

  • Pre‑launch checklist: data contracts, CI/CD for dbt, monitoring, security review, documentation.
  • Tooling roadmap: add Delta Lake or Iceberg when you need ACID guarantees, or a lakehouse query engine like Trino for ad‑hoc BI.
  • Scaling tips: autoscale Spark on Kubernetes, manage Airflow pools to avoid over‑commitment, and trim S3 storage costs by setting lifecycle rules.

Frequently Asked Questions

What is the best way to orchestrate ETL pipelines for a port data lake?

Airflow remains the de‑facto orchestrator because it natively supports Python operators, sensor‑based retries, and can trigger Spark jobs via the KubernetesPodOperator. Pair it with dbt for transformation testing and you get a clear separation of “move data” vs. “shape data.”

How does dbt complement Spark in a maritime data pipeline?

dbt handles SQL‑based, incremental transformations and provides built‑in testing, documentation, and version control. Spark does the heavy lifting of reading raw JSON/CSV at scale; dbt then layers business logic on the resulting Parquet tables.

Can I use Airflow with serverless Spark (e.g., Databricks) for TOS integration?

Yes—Airflow’s DatabricksSubmitRunOperator (or the generic SparkSubmitOperator) can launch notebooks or jobs on a serverless cluster, letting you keep the orchestration logic on‑prem while scaling compute in the cloud.

What security considerations are unique to port‑to‑ERP data flows?

Maritime data often crosses regulatory zones, so you need mutual TLS, IP whitelisting, and role‑based JWT tokens for each system. Encrypt data at rest (SSE‑KMS) and in transit (TLS 1.3) and audit every API call with a centralized logging platform (e.g., Elastic or Splunk).

How do I monitor data quality in a continuous ETL pipeline?

Leverage dbt’s built‑in tests (unique, not_null, relationships) and expose the results as Airflow XComs. Combine this with custom Spark metrics (record counts, schema drift) pushed to Prometheus and visualised in Grafana dashboards.


Related reading: Original discussion

Related Articles

What do you think?

Have experience with this topic? Drop your thoughts in the comments - I read every single one and love hearing different perspectives!

Comments

Popular posts from this blog

2026 Update: Getting Started with SQL & Databases: A Comp...

Low-Code Isn't Stealing Dev Jobs — It's Changing Them (And That's a Good Thing) Have you noticed how many non-tech folks are building Mission-critical apps lately? Honestly, it's kinda wild — marketing tres creating lead-gen tools, ops managers deploying inventory systems. Sound familiar? But here's the deal: it's not magic, it's low-code development platforms reshaping who gets to play the app-building game. What's With This Low-Code Thing Anyway? So let's break it down. Low-code platforms are visual playgrounds where you drag pre-built components instead of hand-coding everything. Think LEGO blocks for software – connect APIs, design interfaces, and automate workflows with minimal typing. Citizen developers (non-IT pros solving their own problems) are loving it because they don't need a PhD in Java. Recently, platforms like OutSystems and Mendix have exploded because honestly? Everyone needs custom tools faster than traditional codin...

Practical Guide: Getting Started with Data Science: A Com...

Laravel 11 Unpacked: What's New and Why It Matters Still running Laravel 10? Honestly, you might be missing out on some serious upgrades. Let's break down what Laravel 11 brings to the table – and whether it's worth the hype for your PHP framework projects. Because when it comes down to it, staying current can save you headaches later. What's Cooking in Laravel 11? Laravel 11 streamlines things right out of the gate. Gone are the cluttered config files – now you get a leaner, more focused starting point. That means less boilerplate and more actual coding. And here's the kicker: they've baked health routing directly into the framework. So instead of third-party packages for uptime monitoring, you've got built-in /up endpoints. But the real showstopper? Per-second API rate limiting. Remember those clunky custom solutions for throttling requests? Now you can just do: RateLimiter::for('api', function (Request $ 💬 What do you think?...

Applying Conditional Formatting in Excel Using Python

Applying Conditional Formatting in Excel Using Python Did you know that 78 % of data‑driven decisions are missed because users can’t spot trends fast enough? With a few lines of Python, you can turn any ordinary Excel spreadsheet into a visual powerhouse—no manual formatting, no endless clicks, just instant, rule‑based highlights that keep your team on the same page. In This Article What is Conditional Formatting? Setting Up Your Python Environment Core Concepts: Rules, Ranges, and Styles Step‑by‑Step Walkthrough Real‑World Use Cases & Actionable Takeaways Frequently Asked Questions What is Conditional Formatting and Why It Matters Excel’s conditional formatting lets you turn raw numbers into a story. Instead of scrolling through endless rows, you instantly see which sales exceeded targets, which inventory levels are low, or which dates are past due. In my experience, teams that use conditional formatting save hours that would otherwise be spent skimming cells. Whe...