This post outlines a pragmatic roadmap to evolve data contracts in an analytics platform. It favors incremental adoption over big‑bang rewrites and ties directly to operational needs: stability, speed, and safety.


1) Why contracts (and why now)

  • Control schema drift and reduce breakage.
  • Enable fast feedback (pre‑prod contract tests) and safer evolution.
  • Improve ownership clarity and auditability.

2) Contract primitives

  • Interface = table/view/topic + schema + semantics + SLA.
  • Versioned schema in source control (YAML/JSON) with description and constraints.
  • Example: minimal YAML for a dataset contract.
name: carrier_performance
version: 1.2.0
owner: analytics.eng@acme.example
schema:
  - name: carrier_id
    type: string
    constraints: [not_null]
  - name: ds
    type: date
    constraints: [not_null]
  - name: on_time_pct
    type: decimal(5,2)
    constraints: [range: {min: 0, max: 100}]
  - name: loads
    type: int
    constraints: [not_null, gte: 0]
slas:
  freshness: {max_lag_hours: 6}
  completeness: {min_rows: 1000}

3) Evolution rules

  • SemVer: MAJOR for breaking, MINOR for additive, PATCH for docs/constraints.
  • Deprecation windows and dual‑publishing for breaking changes.
  • Compatibility gate in CI (producer and consumer tests).
# Pseudo‑CI checks
contract check producer --path contracts/carrier_performance.yaml --against current-schema.json
contract check consumers --dataset carrier_performance --min-version 1.1.0

4) Validation patterns (Spark/Delta)

  • Type/shape validation in ETL; catalog registration only on pass.
  • Constraint checks materialized to a dq_issue table.
# PySpark example: enforce contract types and simple constraints
from pyspark.sql import functions as F, types as T

def validate(df):
    expected = T.StructType([
        T.StructField("carrier_id", T.StringType(), False),
        T.StructField("ds", T.DateType(), False),
        T.StructField("on_time_pct", T.DecimalType(5,2), True),
        T.StructField("loads", T.IntegerType(), False),
    ])
    casted = df.select(
        F.col("carrier_id").cast("string").alias("carrier_id"),
        F.to_date("ds").alias("ds"),
        F.col("on_time_pct").cast("decimal(5,2)").alias("on_time_pct"),
        F.col("loads").cast("int").alias("loads"),
    )
    dq = []
    dq.append(F.count(F.when(F.col("carrier_id").isNull(), 1)).alias("carrier_id_null"))
    dq.append(F.count(F.when(F.col("ds").isNull(), 1)).alias("ds_null"))
    dq_df = casted.select(*dq)
    return casted, dq_df

5) Contract tests for consumers

  • Consumers pin to a minimum compatible version and verify invariants.
  • E.g., downstream view expects on_time_pct present and within 0–100.
-- dbt / SQL test snippet
select count(*) as violations
from {{ ref('carrier_performance') }}
where on_time_pct is null or on_time_pct < 0 or on_time_pct > 100

6) Governance & catalog

  • Register contracts in a catalog (DataHub/OpenMetadata) with lineage and ownership.
  • Emit events on contract version changes and SLA breaches.

7) Rollout plan

  • Phase 1: Document schemas for 3 high‑impact datasets; add read‑only validation.
  • Phase 2: Enforce contract in CI for producers; add consumer tests.
  • Phase 3: Introduce SemVer and deprecation workflows; visibility via catalog.
  • Phase 4: Automate change review and impact analysis with lineage.

8) Reference diagram

flowchart TB
  A[Producer ETL] --> B[(Contract YAML)]
  B --> C{CI checks}
  C -->|pass| D[Publish to Curated]
  C -->|fail| E[Block & Report]
  D --> F[Consumers]
  F --> G[Contract Tests]
  D --> H[Catalog / Lineage]

Pragmatic takeaway: contracts lower entropy and raise change velocity. Start small, enforce where it matters, and expand coverage with clear ownership.