spark-python-data-source
$
npx mdskill add databricks/databricks-agent-skills/spark-python-data-sourceBuild custom Python data sources for Apache Spark 4.0+ to read from and write to external systems in batch and streaming modes.
SKILL.md
.github/skills/spark-python-data-sourceView on GitHub ↗
---
name: spark-python-data-source
description: Build custom Python data sources for Apache Spark using the PySpark DataSource API — batch and streaming readers/writers for external systems. Use this skill whenever someone wants to connect Spark to an external system (database, API, message queue, custom protocol), build a Spark connector or plugin in Python, implement a DataSourceReader or DataSourceWriter, pull data from or push data to a system via Spark, or work with the PySpark DataSource API in any way. Even if they just say "read from X in Spark" or "write DataFrame to Y" and there's no native connector, this skill applies.
---
# spark-python-data-source
Build custom Python data sources for Apache Spark 4.0+ to read from and write to external systems in batch and streaming modes.
## Instructions
You are an experienced Spark developer building custom Python data sources using the PySpark DataSource API. Follow these principles and patterns.
### Core Architecture
Each data source follows a flat, single-level inheritance structure:
1. **DataSource class** — entry point that returns readers/writers
2. **Base Reader/Writer classes** — shared logic for options and data processing
3. **Batch classes** — inherit from base + `DataSourceReader`/`DataSourceWriter`
4. **Stream classes** — inherit from base + `DataSourceStreamReader`/`DataSourceStreamWriter`
See [implementation-template.md](references/implementation-template.md) for the full annotated skeleton covering all four modes (batch read/write, stream read/write).
### Spark-Specific Design Constraints
These are specific to the PySpark DataSource API and its driver/executor architecture — general Python best practices (clean code, minimal dependencies, no premature abstraction) still apply but aren't repeated here.
**Flat single-level inheritance only.** PySpark serializes reader/writer instances to ship them to executors. Complex inheritance hierarchies and abstract base classes break serialization and make cross-process debugging painful. Use one shared base class mixed with the PySpark interface (e.g., `class YourBatchWriter(YourWriter, DataSourceWriter)`).
**Import third-party libraries inside executor methods.** The `read()` and `write()` methods run on remote executor processes that don't share the driver's Python environment. Top-level imports from the driver won't be available on executors — always import libraries like `requests` or database drivers inside the methods that run on workers.
**Minimize dependencies.** Every package you add must be installed on all executor nodes in the cluster, not just the driver. Prefer the standard library; when external packages are needed, keep them few and well-known.
**No async/await** unless the external system's SDK is async-only. The PySpark DataSource API is synchronous, so async adds complexity with no benefit.
### Project Setup
Create a Python project using a packaging tool such as `uv`, `poetry`, or `hatch`. Examples use `uv` (substitute your tool of choice):
```bash
uv init your-datasource
cd your-datasource
uv add pyspark pytest pytest-spark
```
```
your-datasource/
├── pyproject.toml
├── src/
│ └── your_datasource/
│ ├── __init__.py
│ └── datasource.py
└── tests/
├── conftest.py
└── test_datasource.py
```
Run all commands through the packaging tool so they execute within the correct virtual environment:
```bash
uv run pytest # Run tests
uv run ruff check src/ # Lint
uv run ruff format src/ # Format
uv build # Build wheel
```
### Key Implementation Decisions
**Partitioning Strategy** — choose based on data source characteristics:
- Time-based: for APIs with temporal data
- Token-range: for distributed databases
- ID-range: for paginated APIs
- See [partitioning-patterns.md](references/partitioning-patterns.md) for implementations of each strategy
**Authentication** — support multiple methods in priority order:
- Databricks Unity Catalog credentials
- Cloud default credentials (managed identity)
- Explicit credentials (service principal, API key, username/password)
- See [authentication-patterns.md](references/authentication-patterns.md) for patterns with fallback chains
**Type Conversion** — map between Spark and external types:
- Handle nulls, timestamps, UUIDs, collections
- See [type-conversion.md](references/type-conversion.md) for bidirectional mapping tables and helpers
**Streaming Offsets** — design for exactly-once semantics:
- JSON-serializable offset class
- Non-overlapping partition boundaries
- See [streaming-patterns.md](references/streaming-patterns.md) for offset tracking and watermark patterns
**Error Handling** — implement retries and resilience:
- Exponential backoff for transient failures (network, rate limits)
- Circuit breakers for cascading failures
- See [error-handling.md](references/error-handling.md) for retry decorators and failure classification
### Testing
```python
import pytest
from unittest.mock import patch, Mock
@pytest.fixture
def spark():
from pyspark.sql import SparkSession
return SparkSession.builder.master("local[2]").getOrCreate()
def test_data_source_name():
assert YourDataSource.name() == "your-format"
def test_writer_sends_data(spark):
with patch('requests.post') as mock_post:
mock_post.return_value = Mock(status_code=200)
df = spark.createDataFrame([(1, "test")], ["id", "value"])
df.write.format("your-format").option("url", "http://api").save()
assert mock_post.called
```
See [testing-patterns.md](references/testing-patterns.md) for unit/integration test patterns, fixtures, and running tests.
### Reference Implementations
Study these for real-world patterns:
- [cyber-spark-data-connectors](https://github.com/alexott/cyber-spark-data-connectors) — Sentinel, Splunk, REST
- [spark-cassandra-data-source](https://github.com/alexott/spark-cassandra-data-source) — Token-range partitioning
- [pyspark-hubspot](https://github.com/dgomez04/pyspark-hubspot) — REST API pagination
- [pyspark-mqtt](https://github.com/databricks-industry-solutions/python-data-sources/tree/main/mqtt) — Streaming with TLS
## Example Prompts
```
Create a Spark data source for reading from MongoDB with sharding support
Build a streaming connector for RabbitMQ with at-least-once delivery
Implement a batch writer for Snowflake with staged uploads
Write a data source for REST API with OAuth2 authentication and pagination
```
## Related
- databricks-testing: Test data sources on Databricks clusters
- databricks-spark-declarative-pipelines: Use custom sources in DLT pipelines
- python-dev: Python development best practices
## References
- [implementation-template.md](references/implementation-template.md) — Full annotated skeleton; read when starting a new data source
- [partitioning-patterns.md](references/partitioning-patterns.md) — Read when the source supports parallel reads and you need to split work across executors
- [authentication-patterns.md](references/authentication-patterns.md) — Read when the external system requires credentials or tokens
- [type-conversion.md](references/type-conversion.md) — Read when mapping between Spark types and the external system's type system
- [streaming-patterns.md](references/streaming-patterns.md) — Read when implementing `DataSourceStreamReader` or `DataSourceStreamWriter`
- [error-handling.md](references/error-handling.md) — Read when adding retry logic or handling transient failures
- [testing-patterns.md](references/testing-patterns.md) — Read when writing tests; covers unit, integration, and performance testing
- [production-patterns.md](references/production-patterns.md) — Read when hardening for production: observability, security, input validation
- [Official Databricks Documentation](https://docs.databricks.com/pyspark/datasources)
- [Apache Spark Python DataSource Tutorial](https://spark.apache.org/docs/latest/api/python/tutorial/sql/python_data_source.html)
- [awesome-python-datasources](https://github.com/allisonwang-db/awesome-python-datasources) — Directory of community implementations
More from databricks/databricks-agent-skills
- databricks-agent-bricksCreate Agent Bricks: Knowledge Assistants (KA) for document Q&A and Supervisor Agents for multi-agent orchestration (MAS).
- databricks-ai-functionsUse Databricks built-in AI Functions (ai_classify, ai_extract, ai_summarize, ai_mask, ai_translate, ai_fix_grammar, ai_gen, ai_analyze_sentiment, ai_similarity, ai_parse_document, ai_query, ai_forecast) to add AI capabilities directly to SQL and PySpark pipelines without managing model endpoints. Also covers document parsing and building custom RAG pipelines (parse → chunk → index → query).
- databricks-aibi-dashboardsCreate Databricks AI/BI dashboards. Must use when creating, updating, or deploying Lakeview dashboards as Databricks Dashboard have a unique json structure. CRITICAL: You MUST test ALL SQL queries via CLI BEFORE deploying. Follow guidelines strictly.
- databricks-appsBuild apps on Databricks Apps platform. Use when asked to create dashboards, data apps, analytics tools, or visualizations. Evaluates data access patterns (analytics vs Lakebase synced tables) before scaffolding. Invoke BEFORE starting implementation.
- databricks-apps-pythonBuilds Databricks applications. Prefers AppKit (TypeScript + React SDK) for new apps; falls back to Python frameworks (Dash, Streamlit, Gradio, Flask, FastAPI, Reflex) when Python is required. Handles OAuth authorization, app resources, SQL warehouse and Lakebase connectivity, model serving, foundation model APIs, and deployment. Use when building web apps, dashboards, ML demos, or REST APIs for Databricks, or when the user mentions AppKit, Streamlit, Dash, Gradio, Flask, FastAPI, Reflex, or Databricks app.
- databricks-coreDatabricks CLI operations: auth, profiles, data exploration, and bundles. Contains up-to-date guidelines for Databricks-related CLI tasks.
- databricks-dabsCreate, configure, validate, deploy, run, and manage DABs — Declarative Automation Bundles (formerly Databricks Asset Bundles) — for Databricks resources including dashboards, jobs, pipelines, alerts, volumes, and apps
- databricks-dbsql>-
- databricks-docsDatabricks documentation reference via llms.txt index. Use when other skills do not cover a topic, looking up unfamiliar Databricks features, or needing authoritative docs on APIs, configurations, or platform capabilities.
- databricks-execution-compute>-