CarrierPerformanceReportsEtl

CarrierPerformanceReportsEtl is a production Spark/Scala data platform I architected and grew over ~4 years at WTG to ingest, evolve, and serve carrier & logistics performance analytics. I founded it as a solo engineer, then mentored rotating contributors while owning roadmap, standards, and release quality (acting de‑facto team & tech lead while titled Data Scientist / Senior Data Scientist).


1. Problem & Context

Logistics operations required timely, reliable KPIs (turnaround, message latency, carrier performance) sourced from heterogeneous semi‑structured message streams and relational systems. Challenges:

  • High schema volatility in inbound envelope/message payloads (new keys, nested arrays).
  • Need for both streaming freshness and batch reprocessing (bootstrap or backfill).
  • Operational transparency for governance (auditable builds, reproducible assemblies).
  • Minimizing engineer onboarding friction across multiple sub-systems.

2. High-Level Architecture

graph TD
	%% Security
	subgraph Security
		KER[Kerberos / Secrets]
	end

	%% Sources
	subgraph Sources
		KAFKA[Kafka Topics]
		SQLSRC[SQL Server OLTP]
		CDC[CDC Streams]
		FILES[File Drops]
	end
	KER --> KAFKA
	KER --> SQLSRC
	KER --> CDC

	%% Orchestrator & Metadata
	subgraph Orchestrator
		ORCH[SparkCluster Orchestrator]
		STATE[(JobExecutionState)]
		LOG[(JobExecutionLog)]
		ORCH --> STATE
		ORCH --> LOG
	end

	%% Ingestion
	subgraph Ingestion
		STREAM[Streaming Consumers]
		BATCH[Scheduled Ingestors]
	end
	KAFKA --> STREAM
	CDC --> STREAM
	SQLSRC --> STREAM
	FILES --> BATCH
	ORCH --> STREAM
	ORCH --> BATCH

	%% Processing & Storage
	STREAM --> BRONZE[(Bronze)]
	BATCH --> BRONZE
	BRONZE --> INFER[Schema Inference]
	INFER --> REG[(Schema Registry)]
	BRONZE --> DQ[Data Quality]
	DQ --> ENR[Enrichment]
	ENR --> SILVER[(Silver KPIs)]
	SILVER --> GOLD[(Gold Analytics)]
	GOLD --> REPORTS[Domain Reports]
	GOLD --> SQLSERV[Analytical SQL DB]
	SQLSERV --> API[.NET APIs]
	API --> UI[Vue.js Frontend]

	%% Shared runtime
	subgraph Shared
		COMMON[CommonDependencies]
		EXTRA[SparkExtraDependencies]
		HIST[History Server]
	end
	COMMON --> STREAM
	COMMON --> BATCH
	EXTRA --> STREAM
	EXTRA --> BATCH
	HIST --> STREAM
	HIST --> GOLD

3. Module Overview

ModulePurposeKey Notes
CommonDependenciesShared schemas, schema evolution helpers, utilitiesCentral contract; avoids duplication
KafkaConsumersStructured Streaming ingest & transformationAdaptive query strategy (isin vs partition)
ScheduledIngestorsTime-driven batch ingestionParameterized jobs; deployable jars
CarrierPerformanceReportsCore analytical aggregations & KPI logicAssembled spark jar for downstream APIs
OceanCarrierMessagesReportsDomain-specific reporting (ocean/carrier)Builds curated tables / exports
SparkExtraDependenciesConsolidated dependency packagingEnsures consistent classpath across jobs
SparkKube / SparkHistoryServerOperational observabilityJob lineage & performance inspection
ZeppelinServer / WebApiExploration & ad-hoc queriesOnboarding & incident triage
IntegrationTestsE2E validation (SQL Server + FS)Creates/destroys temp DB, asserts schema & logic
hook (pre-push)Local quality gateRuns Scalafix/Scalafmt before push

4. Data & Processing Flow

  1. Ingest: Streaming jobs pull messages / events (Kafka + JDBC lookups) into Bronze with minimal shaping & partitioning (date).
  2. Schema Inference: Specialized jobs explode dynamic key arrays, count occurrences, derive a deterministic StructType (metadata stores occurrence counts) and push to registry.
  3. Enrichment: Joins with client lookup tables; adds business flags & restoration indicators.
  4. Upsert / Merge: Merge logic keyed on AP_PK / composite keys ensures idempotent updates.
  5. Silver (Curated & KPIs): Latency metrics, performance scoring built into CarrierPerformanceReports / OceanCarrierMessagesReports.
  6. Gold Analytics Layer: Additional business modeling & dimensional/tabular shaping for serving (star-like tables, summary snapshots, SCD handling) produced from Silver.
  7. Serving: Gold ingested into a SQL Server analytical schema; .NET backend APIs expose endpoints consumed by Vue.js frontend dashboards.
  8. Deployment: Scripts copy assembled jars + config into publish directories → Docker / staging / production deploy; database migrations applied alongside application releases.

5. Key Engineering Patterns

  • Adaptive Retrieval: Threshold-based switch between isin batches and partitioned time-range SQL queries to prevent predicate bloat.
  • Schema Evolution: Occurrence-aware inference (single vs repeated → scalar or array) with metadata; deterministic stable field ordering.
  • Config Hierarchy: Multi-layer resolution improves portability & secret hygiene.
  • Streaming Abstraction: DefaultStreamJob + trait composition (reader / transformer / loader) for testability and reuse.
  • Performance Guardrails: Cached micro-batches for multi-step transformations; rank/min window logic to compute bounded temporal queries.
  • Quality Gates: Pre-push hook + QGL/DAT fail-fast on formatting/lint to protect downstream builds.

6. Representative Job: ExtractEhubArchiveMessageBronze

Highlights:

  • Periodic trigger (1 minute) with Update mode.
  • Decides retrieval strategy based on distinct key volume.
  • Adds restoration flags; enriches with client info; merges into Delta table (partitioned by date).
  • Ensures low-latency incremental availability while preserving ability to backfill.

7. Schema Inference Job

InferEhubArchiveMessageKeyValuePairsSchema:

  • Explodes nested context collections.
  • Aggregates occurrences and builds new StructType with per-field occurrence metadata.
  • Evolves registry via a service layer, recording timestamp & offset for reproducibility.

Benefits: safe additive evolution, compatibility analysis foundation, and automated downstream column availability.


8. Build & Deployment

  • QGL Orchestration: Sequential activation of build.sbt.templatebuild.sbt to avoid multi-root conflicts.
  • Dependency Resolution: SBT (≥1.6.2) + custom packaging (SparkExtraDependencies) ensures stable transitive libs.
  • Artifacts: Per-project JAR + config copied into Bin/Publish/* for environment-specific Docker pushes.
  • History & Diagnostics: Spark History Server container images published with same pipeline for consistent lineage.
  • Data Model Migrations: Versioned SQL Server DDL/DML scripts executed with .NET deployment to evolve analytical schema (dimensions, facts, indices) in step with Gold transformations.

9. Quality & Governance

  • Code Style: Scalafix + Scalafmt enforced locally & in CI; pre-push hook blocks non-conformant commits.
  • Testing: ScalaTest (unit + integration). SQL Server integration tests create ephemeral databases; file system simulates object storage.
  • Config Discipline: dev/prod overrides ignored by VCS; environment injection via explicit -D flags.
  • Observability (Planned Enhancements): Proposed Prometheus metrics + structured JSON logs for streaming lag & schema drift.

10. Impact & Outcomes

  • Reduced onboarding time via documented multi-module structure & scripted build automation.
  • Accelerated feature delivery (schema changes deploy without manual refactor through inference pipeline).
  • Improved reliability & governance (format/lint gates cut CI churn; deterministic merges lowered duplicate events risk).
  • Enabled multi-domain expansion (additional message families plugged into shared patterns without rewriting core).

11. Roadmap (Illustrative)

HorizonFocusItems
Short (0–2 sprints)Observability & DQPrometheus metrics; data quality assertions pre-merge
Medium (Quarter)Build SimplificationConsolidate SBT multi-module; centralized dependency BOM
MediumSchema Evolution UXDiff viewer + compatibility classification (additive / breaking)
LongLineage & MetadataUnified lineage service (schema + run context)
LongOrchestration ModernizationEvaluate Delta Live Tables / managed streaming orchestration

12. Tech Stack Summary

  • Core: Scala 2.12, Apache Spark (Structured Streaming & batch)
  • Messaging / Sources: Kafka, SQL Server (JDBC)
  • Storage: Delta (merge/upsert), Parquet
  • Build & Tooling: SBT, Ant/QGL scripts, PowerShell & Bash helpers
  • Quality: Scalafix, Scalafmt, pre-push hook
  • Infra / Ops: Docker (Spark History Server), QGL/DAT pipelines
  • Exploration: Zeppelin notebooks
  • Serving / Product: SQL Server analytical schema (Gold), ASP.NET (.NET) backend APIs, Vue.js frontend components

13. Leadership & Ownership

  • Founded codebase; defined module boundaries & naming conventions.
  • Authored scripts enabling reproducible multi-module builds.
  • Mentored rotating engineers (code reviews, pairing, design walkthroughs).
  • Guided roadmap (schema evolution strategy, adaptive querying, observability direction).
  • Extended ownership into serving & product layer: designed SQL Server analytical data model, authored & migrated schema scripts, collaborated on ASP.NET service endpoints, and contributed to data-facing Vue.js UI integration.
  • Co-designed and implemented the Spark job orchestration (SparkCluster) platform: scheduling/state schema, submission host, component version management, and integration of ETL dependency packaging (SparkExtraDependencies) into orchestrated deployments.
  • Acted as principal engineer / tech lead while formally in Data Scientist track.

14. Key Differentiators

  • Automated schema evolution with occurrence-aware inference.
  • Adaptive hybrid streaming + batch retrieval pattern lowering database strain.
  • Strict pre-commit quality enforcement integrated with enterprise build tooling.
  • Modular design enabling independent iteration & reduced coupling.
  • End-to-end ownership from ingestion through Gold serving layer and API/UI consumption (full-stack data product delivery).

15. Summary

CarrierPerformanceReportsEtl converted fragmented, schema-volatile logistics messages into governed, analytics-ready datasets through a modular Spark platform emphasizing evolution, performance safety, and engineering discipline—scaling from a solo build to a multi-contributor asset under consistent technical leadership.


16. Data Quality Pipelines

Embedded DQ operates as a side-car phase in both streaming and scheduled batch paths:

  • Tiered Rules: (a) Structural (schema conformity vs registry), (b) Referential (lookup join cardinality), (c) Semantic (value ranges, enumerations), (d) Temporal (event time vs processing time drift), (e) Anomaly (simple z-score & volume deltas).
  • Execution Points: Pre-merge (reject or quarantine), Post-merge (metric emission), Periodic retrospective audits (daily window).
  • Quarantine Strategy: Invalid rows persisted to an *_invalid Delta table with rule identifiers, enabling replay after rule relaxation.
  • Rule Management: YAML/JSON rule specs versioned with code; compiled to Spark expressions at job start.
  • Metrics: Row counts, rejection ratios, distinct key growth, late-arrival distribution, anomaly score — intended for future Prometheus export.

Mermaid (simplified):

flowchart LR
	Ingest --> Validate[Structural & Referential]
	Validate -->|fail| Quarantine
	Validate --> Semantic[Semantic & Temporal]
	Semantic -->|fail| Quarantine
	Semantic --> Merge[Delta Merge]
	Merge --> PostDQ[Post Metrics]
	Quarantine --> Review[Reprocess Tool]

17. Multi-Source & CDC Ingestion

The platform supports heterogeneous sources beyond Kafka streaming envelopes:

  • OLTP JDBC (SQL Server) incremental pulls with adaptive predicate strategy.
  • CDC Feeds (change tables / log-based connectors) normalized into Bronze with operation flags (I/U/D) enabling downstream SCD handling.
  • Flat File Drops (ad hoc partner extracts) processed via ScheduledIngestors using a uniform loader interface.

Pattern Highlights:

  • Source Abstraction: Trait-based readers (SourceReader) converting each modality to a canonical DataFrame + metadata (ingest_ts, source_system, change_op).
  • Watermark & Replay: CDC ingestion tracks last processed LSN/SCN offset in a checkpoint table for idempotent recovery.
  • Late Data: Event-time windows with watermarking unify CDC and Kafka handling for downstream temporal aggregations.

18. Security & Kerberos Authentication

Enterprise environments required secure access to Kafka & JDBC endpoints:

  • Kerberos: Keytab + principal supplied via env/secret injection; Spark submits with --principal & --keytab (abstracted in launch scripts) to enable ticket renewal for long-running Structured Streaming jobs.
  • Secret Scoping: Credentials never committed—referenced through indirection variables resolved at runtime (env > CLI > config overlays).
  • Network Segregation: SparkHistoryServer & exploration notebooks deployed in a controlled subnet; jobs interact with sources through whitelisted brokers/ports.
  • Auditing: Access events (successful auth, ticket renewal failures) logged and slated for future correlation with DQ anomalies.

19. Extended Differentiators (Additions)

  • Integrated, rule-driven Data Quality pipeline with quarantine & replay.
  • Unified ingestion abstraction spanning Kafka, OLTP incremental, CDC, and file drops.
  • Kerberos-enabled secure streaming suitable for long-lived production jobs.
  • Offset & rule lineage enabling reproducible reprocessing scenarios.

20. Orchestration (SparkCluster Platform – Deep Dive)

The platform’s Spark job orchestration layer (separate SparkCluster solution) provides controlled, observable scheduling and submission of batch & streaming Spark jobs.

Core components:

  • ServiceTaskHost (.NET): Submission & lifecycle host invoked via QGL/DAT pipeline; starts with environment-specific appsettings including SparkSettings (Spark / Hadoop / Winutils / Java versions). Automatically syncs binaries (via DownloadSpark.ps1) from internal artifact storage so clusters share a consistent runtime toolchain.
  • SparkSubmitDatabase (SQL Server): Operational metadata store.
    • JobExecutionState: Single-row per named job capturing current state, expected vs actual run date, succeeded task list, cron expression, distributed tracing identifiers (TraceId, SpanId, ParentSpanId) for correlation.
    • JobExecutionLog: Append-only event & lifecycle log (created timestamp, run/parent linkage, trace/span IDs) indexed & day‑partitioned for efficient time-sliced queries.
  • Build.xml Integration: Declares cross-repo dependencies (launcher, deployer, code-signing, Spark extra dependencies). Post-build publish.bat packages deployment artifacts for staging/production submission hosts.
  • Environment Matrix: Distinct UAT, Staging, Production clusters (master, workers, submit nodes, Zeppelin) with documented port & protocol allowances (driver 4040‑4099, master 8080, history 18080, workers 8081, submission 7077, REST 6066). Network manifest ensures least-privilege connectivity (Kafka, SQL Server, Ceph/object storage, external APIs).
  • Dependency Injection of ETL Artifacts: Orchestrator copies SparkExtraDependencies from the ETL repo into publish tree so scheduled jobs resolve identical library versions as locally tested modules.
  • Scheduling & Cron: Jobs persisted with expected execution timestamps enabling drift detection; cron expressions stored with state allow dynamic evaluation without code redeploy.
  • Observability Hooks: Trace/span IDs align orchestration events with Spark job metrics & application logs (foundation for distributed tracing and future telemetry export).

Operational flow (simplified):

  1. Build pipeline assembles ETL JARs + dependency bundle; orchestrator artifacts published.
  2. ServiceTaskHost reads enabled job catalog & cron metadata, resolves Spark runtime versions, ensures binaries present.
  3. Scheduler computes due jobs → submits via spark-submit to cluster master with consistent classpath.
  4. On state transitions (scheduled → running → success/failure), rows in JobExecutionState and events in JobExecutionLog update with trace IDs.
  5. Historical events facilitate SLA analytics (lateness, duration distribution) & targeted replays.

Participation & Impact:

  • Drove schema design for state & log tables (included tracing & cron fields for future observability expansion).
  • Implemented integration of ETL dependency packaging to guarantee runtime parity.
  • Assisted in network requirement definition and validation across UAT/Staging/Prod clusters.
  • Contributed to operational scripts (publish, version sync) enabling frictionless runtime upgrades (Spark/Hadoop/Java).

Benefits:

  • Reproducible, centrally governed submissions replacing ad-hoc job launches.
  • Faster incident triage via structured state/log tables and trace correlation.
  • Simplified runtime upgrades (single version edit + script sync) lowering configuration drift risk.
  • Foundation for advanced orchestration features (retry policy, SLA breach alerts, dependency graphs).