CarrierPerformanceReportsEtl
CarrierPerformanceReportsEtl is a production Spark/Scala data platform I architected and grew over ~4 years at WTG to ingest, evolve, and serve carrier & logistics performance analytics. I founded it as a solo engineer, then mentored rotating contributors while owning roadmap, standards, and release quality (acting de‑facto team & tech lead while titled Data Scientist / Senior Data Scientist).
1. Problem & Context
Logistics operations required timely, reliable KPIs (turnaround, message latency, carrier performance) sourced from heterogeneous semi‑structured message streams and relational systems. Challenges:
- High schema volatility in inbound envelope/message payloads (new keys, nested arrays).
- Need for both streaming freshness and batch reprocessing (bootstrap or backfill).
- Operational transparency for governance (auditable builds, reproducible assemblies).
- Minimizing engineer onboarding friction across multiple sub-systems.
2. High-Level Architecture
graph TD
%% Security
subgraph Security
KER[Kerberos / Secrets]
end
%% Sources
subgraph Sources
KAFKA[Kafka Topics]
SQLSRC[SQL Server OLTP]
CDC[CDC Streams]
FILES[File Drops]
end
KER --> KAFKA
KER --> SQLSRC
KER --> CDC
%% Orchestrator & Metadata
subgraph Orchestrator
ORCH[SparkCluster Orchestrator]
STATE[(JobExecutionState)]
LOG[(JobExecutionLog)]
ORCH --> STATE
ORCH --> LOG
end
%% Ingestion
subgraph Ingestion
STREAM[Streaming Consumers]
BATCH[Scheduled Ingestors]
end
KAFKA --> STREAM
CDC --> STREAM
SQLSRC --> STREAM
FILES --> BATCH
ORCH --> STREAM
ORCH --> BATCH
%% Processing & Storage
STREAM --> BRONZE[(Bronze)]
BATCH --> BRONZE
BRONZE --> INFER[Schema Inference]
INFER --> REG[(Schema Registry)]
BRONZE --> DQ[Data Quality]
DQ --> ENR[Enrichment]
ENR --> SILVER[(Silver KPIs)]
SILVER --> GOLD[(Gold Analytics)]
GOLD --> REPORTS[Domain Reports]
GOLD --> SQLSERV[Analytical SQL DB]
SQLSERV --> API[.NET APIs]
API --> UI[Vue.js Frontend]
%% Shared runtime
subgraph Shared
COMMON[CommonDependencies]
EXTRA[SparkExtraDependencies]
HIST[History Server]
end
COMMON --> STREAM
COMMON --> BATCH
EXTRA --> STREAM
EXTRA --> BATCH
HIST --> STREAM
HIST --> GOLD
3. Module Overview
Module | Purpose | Key Notes |
---|---|---|
CommonDependencies | Shared schemas, schema evolution helpers, utilities | Central contract; avoids duplication |
KafkaConsumers | Structured Streaming ingest & transformation | Adaptive query strategy (isin vs partition) |
ScheduledIngestors | Time-driven batch ingestion | Parameterized jobs; deployable jars |
CarrierPerformanceReports | Core analytical aggregations & KPI logic | Assembled spark jar for downstream APIs |
OceanCarrierMessagesReports | Domain-specific reporting (ocean/carrier) | Builds curated tables / exports |
SparkExtraDependencies | Consolidated dependency packaging | Ensures consistent classpath across jobs |
SparkKube / SparkHistoryServer | Operational observability | Job lineage & performance inspection |
ZeppelinServer / WebApi | Exploration & ad-hoc queries | Onboarding & incident triage |
IntegrationTests | E2E validation (SQL Server + FS) | Creates/destroys temp DB, asserts schema & logic |
hook (pre-push) | Local quality gate | Runs Scalafix/Scalafmt before push |
4. Data & Processing Flow
- Ingest: Streaming jobs pull messages / events (Kafka + JDBC lookups) into Bronze with minimal shaping & partitioning (date).
- Schema Inference: Specialized jobs explode dynamic key arrays, count occurrences, derive a deterministic StructType (metadata stores occurrence counts) and push to registry.
- Enrichment: Joins with client lookup tables; adds business flags & restoration indicators.
- Upsert / Merge: Merge logic keyed on AP_PK / composite keys ensures idempotent updates.
- Silver (Curated & KPIs): Latency metrics, performance scoring built into CarrierPerformanceReports / OceanCarrierMessagesReports.
- Gold Analytics Layer: Additional business modeling & dimensional/tabular shaping for serving (star-like tables, summary snapshots, SCD handling) produced from Silver.
- Serving: Gold ingested into a SQL Server analytical schema; .NET backend APIs expose endpoints consumed by Vue.js frontend dashboards.
- Deployment: Scripts copy assembled jars + config into publish directories → Docker / staging / production deploy; database migrations applied alongside application releases.
5. Key Engineering Patterns
- Adaptive Retrieval: Threshold-based switch between
isin
batches and partitioned time-range SQL queries to prevent predicate bloat. - Schema Evolution: Occurrence-aware inference (single vs repeated → scalar or array) with metadata; deterministic stable field ordering.
- Config Hierarchy: Multi-layer resolution improves portability & secret hygiene.
- Streaming Abstraction:
DefaultStreamJob
+ trait composition (reader / transformer / loader) for testability and reuse. - Performance Guardrails: Cached micro-batches for multi-step transformations; rank/min window logic to compute bounded temporal queries.
- Quality Gates: Pre-push hook + QGL/DAT fail-fast on formatting/lint to protect downstream builds.
6. Representative Job: ExtractEhubArchiveMessageBronze
Highlights:
- Periodic trigger (1 minute) with Update mode.
- Decides retrieval strategy based on distinct key volume.
- Adds restoration flags; enriches with client info; merges into Delta table (partitioned by date).
- Ensures low-latency incremental availability while preserving ability to backfill.
7. Schema Inference Job
InferEhubArchiveMessageKeyValuePairsSchema
:
- Explodes nested context collections.
- Aggregates occurrences and builds new StructType with per-field occurrence metadata.
- Evolves registry via a service layer, recording timestamp & offset for reproducibility.
Benefits: safe additive evolution, compatibility analysis foundation, and automated downstream column availability.
8. Build & Deployment
- QGL Orchestration: Sequential activation of
build.sbt.template
→build.sbt
to avoid multi-root conflicts. - Dependency Resolution: SBT (≥1.6.2) + custom packaging (SparkExtraDependencies) ensures stable transitive libs.
- Artifacts: Per-project JAR + config copied into
Bin/Publish/*
for environment-specific Docker pushes. - History & Diagnostics: Spark History Server container images published with same pipeline for consistent lineage.
- Data Model Migrations: Versioned SQL Server DDL/DML scripts executed with .NET deployment to evolve analytical schema (dimensions, facts, indices) in step with Gold transformations.
9. Quality & Governance
- Code Style: Scalafix + Scalafmt enforced locally & in CI; pre-push hook blocks non-conformant commits.
- Testing: ScalaTest (unit + integration). SQL Server integration tests create ephemeral databases; file system simulates object storage.
- Config Discipline: dev/prod overrides ignored by VCS; environment injection via explicit -D flags.
- Observability (Planned Enhancements): Proposed Prometheus metrics + structured JSON logs for streaming lag & schema drift.
10. Impact & Outcomes
- Reduced onboarding time via documented multi-module structure & scripted build automation.
- Accelerated feature delivery (schema changes deploy without manual refactor through inference pipeline).
- Improved reliability & governance (format/lint gates cut CI churn; deterministic merges lowered duplicate events risk).
- Enabled multi-domain expansion (additional message families plugged into shared patterns without rewriting core).
11. Roadmap (Illustrative)
Horizon | Focus | Items |
---|---|---|
Short (0–2 sprints) | Observability & DQ | Prometheus metrics; data quality assertions pre-merge |
Medium (Quarter) | Build Simplification | Consolidate SBT multi-module; centralized dependency BOM |
Medium | Schema Evolution UX | Diff viewer + compatibility classification (additive / breaking) |
Long | Lineage & Metadata | Unified lineage service (schema + run context) |
Long | Orchestration Modernization | Evaluate Delta Live Tables / managed streaming orchestration |
12. Tech Stack Summary
- Core: Scala 2.12, Apache Spark (Structured Streaming & batch)
- Messaging / Sources: Kafka, SQL Server (JDBC)
- Storage: Delta (merge/upsert), Parquet
- Build & Tooling: SBT, Ant/QGL scripts, PowerShell & Bash helpers
- Quality: Scalafix, Scalafmt, pre-push hook
- Infra / Ops: Docker (Spark History Server), QGL/DAT pipelines
- Exploration: Zeppelin notebooks
- Serving / Product: SQL Server analytical schema (Gold), ASP.NET (.NET) backend APIs, Vue.js frontend components
13. Leadership & Ownership
- Founded codebase; defined module boundaries & naming conventions.
- Authored scripts enabling reproducible multi-module builds.
- Mentored rotating engineers (code reviews, pairing, design walkthroughs).
- Guided roadmap (schema evolution strategy, adaptive querying, observability direction).
- Extended ownership into serving & product layer: designed SQL Server analytical data model, authored & migrated schema scripts, collaborated on ASP.NET service endpoints, and contributed to data-facing Vue.js UI integration.
- Co-designed and implemented the Spark job orchestration (SparkCluster) platform: scheduling/state schema, submission host, component version management, and integration of ETL dependency packaging (SparkExtraDependencies) into orchestrated deployments.
- Acted as principal engineer / tech lead while formally in Data Scientist track.
14. Key Differentiators
- Automated schema evolution with occurrence-aware inference.
- Adaptive hybrid streaming + batch retrieval pattern lowering database strain.
- Strict pre-commit quality enforcement integrated with enterprise build tooling.
- Modular design enabling independent iteration & reduced coupling.
- End-to-end ownership from ingestion through Gold serving layer and API/UI consumption (full-stack data product delivery).
15. Summary
CarrierPerformanceReportsEtl converted fragmented, schema-volatile logistics messages into governed, analytics-ready datasets through a modular Spark platform emphasizing evolution, performance safety, and engineering discipline—scaling from a solo build to a multi-contributor asset under consistent technical leadership.
16. Data Quality Pipelines
Embedded DQ operates as a side-car phase in both streaming and scheduled batch paths:
- Tiered Rules: (a) Structural (schema conformity vs registry), (b) Referential (lookup join cardinality), (c) Semantic (value ranges, enumerations), (d) Temporal (event time vs processing time drift), (e) Anomaly (simple z-score & volume deltas).
- Execution Points: Pre-merge (reject or quarantine), Post-merge (metric emission), Periodic retrospective audits (daily window).
- Quarantine Strategy: Invalid rows persisted to an
*_invalid
Delta table with rule identifiers, enabling replay after rule relaxation. - Rule Management: YAML/JSON rule specs versioned with code; compiled to Spark expressions at job start.
- Metrics: Row counts, rejection ratios, distinct key growth, late-arrival distribution, anomaly score — intended for future Prometheus export.
Mermaid (simplified):
flowchart LR
Ingest --> Validate[Structural & Referential]
Validate -->|fail| Quarantine
Validate --> Semantic[Semantic & Temporal]
Semantic -->|fail| Quarantine
Semantic --> Merge[Delta Merge]
Merge --> PostDQ[Post Metrics]
Quarantine --> Review[Reprocess Tool]
17. Multi-Source & CDC Ingestion
The platform supports heterogeneous sources beyond Kafka streaming envelopes:
- OLTP JDBC (SQL Server) incremental pulls with adaptive predicate strategy.
- CDC Feeds (change tables / log-based connectors) normalized into Bronze with operation flags (I/U/D) enabling downstream SCD handling.
- Flat File Drops (ad hoc partner extracts) processed via ScheduledIngestors using a uniform loader interface.
Pattern Highlights:
- Source Abstraction: Trait-based readers (
SourceReader
) converting each modality to a canonical DataFrame + metadata (ingest_ts, source_system, change_op). - Watermark & Replay: CDC ingestion tracks last processed LSN/SCN offset in a checkpoint table for idempotent recovery.
- Late Data: Event-time windows with watermarking unify CDC and Kafka handling for downstream temporal aggregations.
18. Security & Kerberos Authentication
Enterprise environments required secure access to Kafka & JDBC endpoints:
- Kerberos: Keytab + principal supplied via env/secret injection; Spark submits with
--principal
&--keytab
(abstracted in launch scripts) to enable ticket renewal for long-running Structured Streaming jobs. - Secret Scoping: Credentials never committed—referenced through indirection variables resolved at runtime (env > CLI > config overlays).
- Network Segregation: SparkHistoryServer & exploration notebooks deployed in a controlled subnet; jobs interact with sources through whitelisted brokers/ports.
- Auditing: Access events (successful auth, ticket renewal failures) logged and slated for future correlation with DQ anomalies.
19. Extended Differentiators (Additions)
- Integrated, rule-driven Data Quality pipeline with quarantine & replay.
- Unified ingestion abstraction spanning Kafka, OLTP incremental, CDC, and file drops.
- Kerberos-enabled secure streaming suitable for long-lived production jobs.
- Offset & rule lineage enabling reproducible reprocessing scenarios.
20. Orchestration (SparkCluster Platform – Deep Dive)
The platform’s Spark job orchestration layer (separate SparkCluster
solution) provides controlled, observable scheduling and submission of batch & streaming Spark jobs.
Core components:
- ServiceTaskHost (.NET): Submission & lifecycle host invoked via QGL/DAT pipeline; starts with environment-specific
appsettings
includingSparkSettings
(Spark / Hadoop / Winutils / Java versions). Automatically syncs binaries (viaDownloadSpark.ps1
) from internal artifact storage so clusters share a consistent runtime toolchain. - SparkSubmitDatabase (SQL Server): Operational metadata store.
JobExecutionState
: Single-row per named job capturing current state, expected vs actual run date, succeeded task list, cron expression, distributed tracing identifiers (TraceId, SpanId, ParentSpanId) for correlation.JobExecutionLog
: Append-only event & lifecycle log (created timestamp, run/parent linkage, trace/span IDs) indexed & day‑partitioned for efficient time-sliced queries.
- Build.xml Integration: Declares cross-repo dependencies (launcher, deployer, code-signing, Spark extra dependencies). Post-build
publish.bat
packages deployment artifacts for staging/production submission hosts. - Environment Matrix: Distinct UAT, Staging, Production clusters (master, workers, submit nodes, Zeppelin) with documented port & protocol allowances (driver 4040‑4099, master 8080, history 18080, workers 8081, submission 7077, REST 6066). Network manifest ensures least-privilege connectivity (Kafka, SQL Server, Ceph/object storage, external APIs).
- Dependency Injection of ETL Artifacts: Orchestrator copies
SparkExtraDependencies
from the ETL repo into publish tree so scheduled jobs resolve identical library versions as locally tested modules. - Scheduling & Cron: Jobs persisted with expected execution timestamps enabling drift detection; cron expressions stored with state allow dynamic evaluation without code redeploy.
- Observability Hooks: Trace/span IDs align orchestration events with Spark job metrics & application logs (foundation for distributed tracing and future telemetry export).
Operational flow (simplified):
- Build pipeline assembles ETL JARs + dependency bundle; orchestrator artifacts published.
- ServiceTaskHost reads enabled job catalog & cron metadata, resolves Spark runtime versions, ensures binaries present.
- Scheduler computes due jobs → submits via spark-submit to cluster master with consistent classpath.
- On state transitions (scheduled → running → success/failure), rows in
JobExecutionState
and events inJobExecutionLog
update with trace IDs. - Historical events facilitate SLA analytics (lateness, duration distribution) & targeted replays.
Participation & Impact:
- Drove schema design for state & log tables (included tracing & cron fields for future observability expansion).
- Implemented integration of ETL dependency packaging to guarantee runtime parity.
- Assisted in network requirement definition and validation across UAT/Staging/Prod clusters.
- Contributed to operational scripts (publish, version sync) enabling frictionless runtime upgrades (Spark/Hadoop/Java).
Benefits:
- Reproducible, centrally governed submissions replacing ad-hoc job launches.
- Faster incident triage via structured state/log tables and trace correlation.
- Simplified runtime upgrades (single version edit + script sync) lowering configuration drift risk.
- Foundation for advanced orchestration features (retry policy, SLA breach alerts, dependency graphs).