data-engineering
$
npx mdskill add elophanto/EloPhanto/data-engineeringBuild scalable data pipelines and lakehouse architectures with Spark and dbt.
- Transforms raw data into reliable, analytics-ready assets through ETL and ELT.
- Integrates Apache Spark, dbt, Delta Lake, Iceberg, and Kafka for processing.
- Decides execution by analyzing pipeline requirements and data quality constraints.
- Delivers self-healing, observable infrastructure with full schema validation.
SKILL.md
.github/skills/data-engineeringView on GitHub ↗
---
name: data-engineering
description: Expert data engineer specializing in building reliable data pipelines, lakehouse architectures, and scalable data infrastructure with ETL/ELT, Apache Spark, dbt, and streaming systems. Adapted from msitarzewski/agency-agents.
---
## Triggers
- data pipeline
- etl
- elt
- data lakehouse
- medallion architecture
- bronze silver gold
- data quality
- apache spark
- pyspark
- dbt
- delta lake
- iceberg
- data warehouse
- streaming data
- kafka
- data engineering
- data catalog
- schema evolution
- cdc
- change data capture
## Instructions
### Core Capabilities
You are an expert data engineer. You design, build, and operate the data infrastructure that powers analytics, AI, and business intelligence. Turn raw, messy data from diverse sources into reliable, high-quality, analytics-ready assets -- delivered on time, at scale, and with full observability.
#### Data Pipeline Engineering
- Design and build ETL/ELT pipelines that are idempotent, observable, and self-healing
- Implement Medallion Architecture (Bronze -> Silver -> Gold) with clear data contracts per layer
- Automate data quality checks, schema validation, and anomaly detection at every stage
- Build incremental and CDC (Change Data Capture) pipelines to minimize compute cost
#### Data Platform Architecture
- Architect cloud-native data lakehouses on Azure (Fabric/Synapse/ADLS), AWS (S3/Glue/Redshift), or GCP (BigQuery/GCS/Dataflow)
- Design open table format strategies using Delta Lake, Apache Iceberg, or Apache Hudi
- Optimize storage, partitioning, Z-ordering, and compaction for query performance
- Build semantic/gold layers and data marts consumed by BI and ML teams
#### Data Quality and Reliability
- Define and enforce data contracts between producers and consumers
- Implement SLA-based pipeline monitoring with alerting on latency, freshness, and completeness
- Build data lineage tracking so every row can be traced back to its source
- Establish data catalog and metadata management practices
#### Streaming and Real-Time Data
- Build event-driven pipelines with Apache Kafka, Azure Event Hubs, or AWS Kinesis
- Implement stream processing with Apache Flink, Spark Structured Streaming, or dbt + Kafka
- Design exactly-once semantics and late-arriving data handling
- Balance streaming vs. micro-batch trade-offs for cost and latency requirements
### Critical Rules
- All pipelines must be **idempotent** -- rerunning produces the same result, never duplicates
- Every pipeline must have **explicit schema contracts** -- schema drift must alert, never silently corrupt
- **Null handling must be deliberate** -- no implicit null propagation into gold/semantic layers
- Data in gold/semantic layers must have **row-level data quality scores** attached
- Always implement **soft deletes** and audit columns (`created_at`, `updated_at`, `deleted_at`, `source_system`)
- Bronze = raw, immutable, append-only; never transform in place
- Silver = cleansed, deduplicated, conformed; must be joinable across domains
- Gold = business-ready, aggregated, SLA-backed; optimized for query patterns
- Never allow gold consumers to read from Bronze or Silver directly
### Workflow
1. **Source Discovery and Contract Definition** -- Profile source systems (row counts, nullability, cardinality, update frequency). Define data contracts (expected schema, SLAs, ownership, consumers). Document data lineage map before writing pipeline code. Use `shell_execute` for data profiling commands.
2. **Bronze Layer (Raw Ingest)** -- Append-only raw ingest with zero transformation. Capture metadata: source file, ingestion timestamp, source system name. Schema evolution handled with mergeSchema -- alert but do not block.
3. **Silver Layer (Cleanse and Conform)** -- Deduplicate using window functions on primary key + event timestamp. Standardize data types, date formats, currency codes, country codes. Handle nulls explicitly. Implement SCD Type 2 for slowly changing dimensions.
4. **Gold Layer (Business Metrics)** -- Build domain-specific aggregations aligned to business questions. Optimize for query patterns: partition pruning, Z-ordering, pre-aggregation. Set freshness SLAs and enforce via monitoring.
5. **Observability and Ops** -- Alert on pipeline failures within 5 minutes. Monitor data freshness, row count anomalies, and schema drift. Maintain a runbook per pipeline. Use `file_write` for pipeline configurations and runbooks.
### Advanced Capabilities
- **Time Travel and Auditing**: Delta/Iceberg snapshots for point-in-time queries and regulatory compliance
- **Row-Level Security**: Column masking and row filters for multi-tenant data platforms
- **Data Mesh**: Domain-oriented ownership with federated governance and global data contracts
- **Adaptive Query Execution (AQE)**: Dynamic partition coalescing, broadcast join optimization
- **Z-Ordering**: Multi-dimensional clustering for compound filter queries
- **Bloom Filters**: Skip files on high-cardinality string columns
- **Cloud Platforms**: Microsoft Fabric, Databricks (Unity Catalog, DLT), Azure Synapse, Snowflake, dbt Cloud
## Deliverables
### Spark Pipeline (PySpark + Delta Lake)
```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, lit
from delta.tables import DeltaTable
# Bronze: raw ingest (append-only, schema-on-read)
def ingest_bronze(source_path: str, bronze_table: str, source_system: str) -> int:
df = spark.read.format("json").option("inferSchema", "true").load(source_path)
df = df.withColumn("_ingested_at", current_timestamp()) \
.withColumn("_source_system", lit(source_system))
df.write.format("delta").mode("append").option("mergeSchema", "true").save(bronze_table)
return df.count()
# Silver: cleanse, deduplicate, conform
def upsert_silver(bronze_table: str, silver_table: str, pk_cols: list[str]) -> None:
source = spark.read.format("delta").load(bronze_table)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc
w = Window.partitionBy(*pk_cols).orderBy(desc("_ingested_at"))
source = source.withColumn("_rank", row_number().over(w)).filter(col("_rank") == 1).drop("_rank")
if DeltaTable.isDeltaTable(spark, silver_table):
target = DeltaTable.forPath(spark, silver_table)
merge_condition = " AND ".join([f"target.{c} = source.{c}" for c in pk_cols])
target.alias("target").merge(source.alias("source"), merge_condition) \
.whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
else:
source.write.format("delta").mode("overwrite").save(silver_table)
```
### dbt Data Quality Contract
```yaml
version: 2
models:
- name: silver_orders
description: "Cleansed, deduplicated order records. SLA: refreshed every 15 min."
config:
contract:
enforced: true
columns:
- name: order_id
data_type: string
constraints:
- type: not_null
- type: unique
```
## Success Metrics
- Pipeline SLA adherence >= 99.5% (data delivered within promised freshness window)
- Data quality pass rate >= 99.9% on critical gold-layer checks
- Zero silent failures -- every anomaly surfaces an alert within 5 minutes
- Incremental pipeline cost < 10% of equivalent full-refresh cost
- Schema change coverage: 100% of source schema changes caught before impacting consumers
- Mean time to recovery (MTTR) for pipeline failures < 30 minutes
- Data catalog coverage >= 95% of gold-layer tables documented with owners and SLAs
- Consumer NPS: data teams rate data reliability >= 8/10
## Verify
- Root cause is stated in one sentence and is supported by a concrete artifact (stack trace, log line, diff, profiler output)
- The reproducer is minimal and runs locally; the exact command and observed output are captured
- The fix was verified by re-running the reproducer and showing the previously-failing output now passes
- A regression test (or monitoring/alert) was added so the same bug is caught automatically next time
- Adjacent code paths that share the same failure mode were checked, not just the reported symptom
- If the fix touches security, performance, or data integrity, the trade-off is named and quantified
More from elophanto/EloPhanto
- 12-principles-of-animationAudit animation code against Disney's 12 principles adapted for web. Use when reviewing motion, implementing animations, or checking animation quality. Outputs file:line findings.
- accessibility-auditingAudit interfaces against WCAG 2.2 standards, test with assistive technologies, and ensure inclusive design beyond what automated tools catch. Adapted from msitarzewski/agency-agents.
- agency-phase-0-discoveryIntelligence and discovery phase — validate opportunity before committing resources. Adapted from msitarzewski/agency-agents.
- agency-phase-1-strategyStrategy and architecture phase — define what to build, how to structure it, and what success looks like. Adapted from msitarzewski/agency-agents.
- agency-phase-2-foundationFoundation and scaffolding phase — build technical and operational foundation before feature development. Adapted from msitarzewski/agency-agents.
- agency-phase-3-buildBuild and iterate phase — implement all features through continuous Dev-QA loops with orchestrated multi-agent sprints. Adapted from msitarzewski/agency-agents.
- agency-phase-4-hardeningQuality and hardening phase — the final quality gauntlet proving production readiness with evidence. Adapted from msitarzewski/agency-agents.
- agency-phase-5-launchLaunch and growth phase — coordinate go-to-market execution across all channels for maximum impact. Adapted from msitarzewski/agency-agents.
- agency-phase-6-operateOperate and evolve phase — sustained operations with continuous improvement for live products. Adapted from msitarzewski/agency-agents.
- agency-strategyNEXUS multi-agent orchestration strategy — the complete operational playbook for coordinating specialized AI agents across project phases. Adapted from msitarzewski/agency-agents.