Building a Port Data Lake: Architecture, APIs & ETL Pipelines for TOS/ERP Integration
In 2023, 78 % of maritime logistics firms said a single data‑silod ERP system cost them an average of $1.2 M per year in lost efficiency. By turning that ERP into a port‑wide data lake, you can slash manual data handling by up to 85 % and unlock real‑time analytics that drive smarter vessel scheduling. Imagine a data engineer who no longer spends hours writing custom scripts for each TOS – instead, a single, reusable ETL pipeline feeds clean, searchable data to every downstream application.
Why a Port‑Centric Data Lake Matters
Fast turnaround times, lower demurrage, and cleaner compliance reports are all on the table when you have one source of truth. In my experience, the biggest win comes from ditching duplicated tables across TOS, ERP, and AIS feeds. A European hub cut vessel‑berth allocation time from 45 min to 7 min after lake adoption—pretty much a game‑changer for their operations.
Core Architecture Blueprint
- Ingestion layer: API gateways, Kafka/Redpanda streams, and file‑drop zones for legacy CSVs.
- Storage layer: Partitioned Parquet on S3/ADLS with Hive metastore for schema‑on‑read.
- Processing layer: Spark Structured Streaming + dbt for transformation, Airflow for orchestration.
The beauty of this stack is that each component does exactly what it’s good at, letting you scale parts independently. For instance, you can spin up more Kafka brokers during peak season without touching the downstream Spark jobs.
Designing Robust APIs for TOS ↔ ERP Sync
So what's the catch when you expose TOS data to ERP? REST is great for CRUD, but gRPC can cut latency for vessel‑status pushes that happen every 30 seconds. I think gRPC shines when you need a lightweight, bi‑directional stream but keep a REST fallback for legacy components.
Authentication isn’t a afterthought. OAuth2 with short‑lived JWTs, coupled with network zoning, keeps your data safe from the open internet. And remember, maritime regulations often demand mutual TLS and IP whitelisting—don't overlook those little details.
Versioning strategy: stick to semantic versioning, but make the migration path backward‑compatible. That way downstream pipelines won't break when you add a new field to the vessel payload.
Hands‑On Walkthrough: Building an ETL Pipeline with Airflow, dbt & Spark
Below is a minimal but complete example that pulls vessel‑status JSON from a TOS REST endpoint, writes raw data to an S3 landing zone, processes it with Spark Structured Streaming into partitioned Parquet, and finally enriches the data with ERP contract information via dbt.
# airflow_dag.py
import datetime
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.amazon.aws.transfers.s3_to_s3 import S3ToS3Operator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.trigger_dbt_operator import DbtRunOperator
default_args = {
'owner': 'etl_team',
'retries': 3,
'retry_delay': datetime.timedelta(minutes=5)
}
with DAG(
dag_id='tos_to_lake_etl',
default_args=default_args,
schedule_interval='@hourly',
start_date=datetime.datetime(2026, 1, 1),
catchup=False
) as dag:
fetch = SimpleHttpOperator(
task_id='fetch_tos',
http_conn_id='tos_api',
endpoint='/vessels/status',
method='GET',
response_filter=lambda r: r.json(),
log_response=True
)
store = S3ToS3Operator(
task_id='store_raw',
source_bucket='tos-raw',
dest_bucket='lake-landing',
dest_key='raw/{{ ds }}/vessels.json',
overwrite=True
)
spark = SparkSubmitOperator(
task_id='spark_etl',
application='s3://scripts/spark_process.py',
name='tos-parquet',
conn_id='spark_cluster',
conf={'spark.sql.shuffle.partitions': '200'}
)
dbt = DbtRunOperator(
task_id='dbt_transform',
dir='/opt/dbt/models',
profile_name='lake'
)
fetch >> store >> spark >> dbt
# spark_process.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
spark = SparkSession.builder.appName("TOS ETL").getOrCreate()
raw = spark.readStream.json("s3://lake-landing/raw/*/vessels.json")
schema = StructType([
StructField("vessel_id", StringType(), True),
StructField("status", StringType(), True),
StructField("timestamp", TimestampType(), True)
])
parsed = raw.selectExpr("cast(vessel_id as string) as vessel_id",
"cast(status as string) as status",
"cast(timestamp as timestamp) as timestamp")
query = parsed.writeStream \
.format("parquet") \
.option("path", "s3://lake-data/clean") \
.partitionBy("year(timestamp)", "month(timestamp)") \
.option("checkpointLocation", "s3://lake-data/checkpoints") \
.start()
query.awaitTermination()
# models/vessel_enriched.sql
{{ config(materialized='incremental') }}
WITH raw AS (
SELECT * FROM {{ source('lake', 'clean') }}
),
erp AS (
SELECT
contract_id,
vessel_id,
customer_name,
contract_start,
contract_end
FROM {{ source('erp', 'contracts') }}
)
SELECT
r.vessel_id,
r.status,
r.timestamp,
e.contract_id,
e.customer_name,
e.contract_start,
e.contract_end
FROM raw r
LEFT JOIN erp e
ON r.vessel_id = e.vessel_id
{% if is_incremental() %}
WHERE r.timestamp > (SELECT MAX(timestamp) FROM {{ this }})
{% endif %}
Validation & Alerting
Use Airflow sensors to check that the Spark job finished successfully. dbt tests (e.g., unique(vessel_id) and not_null(timestamp)) provide a safety net. When a test fails, push a message to Slack via a webhook or trigger PagerDuty escalation.
Actionable Takeaways & Next Steps
- Pre‑launch checklist: data contracts, CI/CD for dbt, monitoring, security review, documentation.
- Tooling roadmap: add Delta Lake or Iceberg when you need ACID guarantees, or a lakehouse query engine like Trino for ad‑hoc BI.
- Scaling tips: autoscale Spark on Kubernetes, manage Airflow pools to avoid over‑commitment, and trim S3 storage costs by setting lifecycle rules.
Frequently Asked Questions
What is the best way to orchestrate ETL pipelines for a port data lake?
Airflow remains the de‑facto orchestrator because it natively supports Python operators, sensor‑based retries, and can trigger Spark jobs via the KubernetesPodOperator. Pair it with dbt for transformation testing and you get a clear separation of “move data” vs. “shape data.”
How does dbt complement Spark in a maritime data pipeline?
dbt handles SQL‑based, incremental transformations and provides built‑in testing, documentation, and version control. Spark does the heavy lifting of reading raw JSON/CSV at scale; dbt then layers business logic on the resulting Parquet tables.
Can I use Airflow with serverless Spark (e.g., Databricks) for TOS integration?
Yes—Airflow’s DatabricksSubmitRunOperator (or the generic SparkSubmitOperator) can launch notebooks or jobs on a serverless cluster, letting you keep the orchestration logic on‑prem while scaling compute in the cloud.
What security considerations are unique to port‑to‑ERP data flows?
Maritime data often crosses regulatory zones, so you need mutual TLS, IP whitelisting, and role‑based JWT tokens for each system. Encrypt data at rest (SSE‑KMS) and in transit (TLS 1.3) and audit every API call with a centralized logging platform (e.g., Elastic or Splunk).
How do I monitor data quality in a continuous ETL pipeline?
Leverage dbt’s built‑in tests (unique, not_null, relationships) and expose the results as Airflow XComs. Combine this with custom Spark metrics (record counts, schema drift) pushed to Prometheus and visualised in Grafana dashboards.
Related reading: Original discussion
Related Articles
What do you think?
Have experience with this topic? Drop your thoughts in the comments - I read every single one and love hearing different perspectives!
Comments
Post a Comment