ray-data
$
npx mdskill add Orchestra-Research/AI-Research-SKILLs/ray-dataDistributed data processing library for ML and AI workloads.
SKILL.md
.github/skills/ray-dataView on GitHub ↗
---
name: ray-data
description: Scalable data processing for ML workloads. Streaming execution across CPU/GPU, supports Parquet/CSV/JSON/images. Integrates with Ray Train, PyTorch, TensorFlow. Scales from single machine to 100s of nodes. Use for batch inference, data preprocessing, multi-modal data loading, or distributed ETL pipelines.
version: 1.0.0
author: Orchestra Research
license: MIT
tags: [Data Processing, Ray Data, Distributed Computing, ML Pipelines, Batch Inference, ETL, Scalable, Ray, PyTorch, TensorFlow]
dependencies: ["ray[data]", pyarrow, pandas]
---
# Ray Data - Scalable ML Data Processing
Distributed data processing library for ML and AI workloads.
## When to use Ray Data
**Use Ray Data when:**
- Processing large datasets (>100GB) for ML training
- Need distributed data preprocessing across cluster
- Building batch inference pipelines
- Loading multi-modal data (images, audio, video)
- Scaling data processing from laptop to cluster
**Key features**:
- **Streaming execution**: Process data larger than memory
- **GPU support**: Accelerate transforms with GPUs
- **Framework integration**: PyTorch, TensorFlow, HuggingFace
- **Multi-modal**: Images, Parquet, CSV, JSON, audio, video
**Use alternatives instead**:
- **Pandas**: Small data (<1GB) on single machine
- **Dask**: Tabular data, SQL-like operations
- **Spark**: Enterprise ETL, SQL queries
## Quick start
### Installation
```bash
pip install -U 'ray[data]'
```
### Load and transform data
```python
import ray
# Read Parquet files
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# Transform data (lazy execution)
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})
# Consume data
for batch in ds.iter_batches(batch_size=100):
print(batch)
```
### Integration with Ray Train
```python
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# Create dataset
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config):
# Access dataset in training
train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
for batch in train_ds.iter_batches(batch_size=32):
# Train on batch
pass
# Train with Ray
trainer = TorchTrainer(
train_func,
datasets={"train": train_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()
```
## Reading data
### From cloud storage
```python
import ray
# Parquet (recommended for ML)
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# CSV
ds = ray.data.read_csv("s3://bucket/data/*.csv")
# JSON
ds = ray.data.read_json("gs://bucket/data/*.json")
# Images
ds = ray.data.read_images("s3://bucket/images/")
```
### From Python objects
```python
# From list
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])
# From range
ds = ray.data.range(1000000) # Synthetic data
# From pandas
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)
```
## Transformations
### Map batches (vectorized)
```python
# Batch transformation (fast)
def process_batch(batch):
batch["doubled"] = batch["value"] * 2
return batch
ds = ds.map_batches(process_batch, batch_size=1000)
```
### Row transformations
```python
# Row-by-row (slower)
def process_row(row):
row["squared"] = row["value"] ** 2
return row
ds = ds.map(process_row)
```
### Filter
```python
# Filter rows
ds = ds.filter(lambda row: row["value"] > 100)
```
### Group by and aggregate
```python
# Group by column
ds = ds.groupby("category").count()
# Custom aggregation
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
```
## GPU-accelerated transforms
```python
# Use GPU for preprocessing
def preprocess_images_gpu(batch):
import torch
images = torch.tensor(batch["image"]).cuda()
# GPU preprocessing
processed = images * 255
return {"processed": processed.cpu().numpy()}
ds = ds.map_batches(
preprocess_images_gpu,
batch_size=64,
num_gpus=1 # Request GPU
)
```
## Writing data
```python
# Write to Parquet
ds.write_parquet("s3://bucket/output/")
# Write to CSV
ds.write_csv("output/")
# Write to JSON
ds.write_json("output/")
```
## Performance optimization
### Repartition
```python
# Control parallelism
ds = ds.repartition(100) # 100 blocks for 100-core cluster
```
### Batch size tuning
```python
# Larger batches = faster vectorized ops
ds.map_batches(process_fn, batch_size=10000) # vs batch_size=100
```
### Streaming execution
```python
# Process data larger than memory
ds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
process(batch) # Streamed, not loaded to memory
```
## Common patterns
### Batch inference
```python
import ray
# Load model
def load_model():
# Load once per worker
return MyModel()
# Inference function
class BatchInference:
def __init__(self):
self.model = load_model()
def __call__(self, batch):
predictions = self.model(batch["input"])
return {"prediction": predictions}
# Run distributed inference
ds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")
```
### Data preprocessing pipeline
```python
# Multi-step pipeline
ds = (
ray.data.read_parquet("s3://raw/")
.map_batches(clean_data)
.map_batches(tokenize)
.map_batches(augment)
.write_parquet("s3://processed/")
)
```
## Integration with ML frameworks
### PyTorch
```python
# Convert to PyTorch
torch_ds = ds.to_torch(label_column="label", batch_size=32)
for batch in torch_ds:
# batch is dict with tensors
inputs, labels = batch["features"], batch["label"]
```
### TensorFlow
```python
# Convert to TensorFlow
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)
for features, labels in tf_ds:
# Train model
pass
```
## Supported data formats
| Format | Read | Write | Use Case |
|--------|------|-------|----------|
| Parquet | ✅ | ✅ | ML data (recommended) |
| CSV | ✅ | ✅ | Tabular data |
| JSON | ✅ | ✅ | Semi-structured |
| Images | ✅ | ❌ | Computer vision |
| NumPy | ✅ | ✅ | Arrays |
| Pandas | ✅ | ❌ | DataFrames |
## Performance benchmarks
**Scaling** (processing 100GB data):
- 1 node (16 cores): ~30 minutes
- 4 nodes (64 cores): ~8 minutes
- 16 nodes (256 cores): ~2 minutes
**GPU acceleration** (image preprocessing):
- CPU only: 1,000 images/sec
- 1 GPU: 5,000 images/sec
- 4 GPUs: 18,000 images/sec
## Use cases
**Production deployments**:
- **Pinterest**: Last-mile data processing for model training
- **ByteDance**: Scaling offline inference with multi-modal LLMs
- **Spotify**: ML platform for batch inference
## References
- **[Transformations Guide](references/transformations.md)** - Map, filter, groupby operations
- **[Integration Guide](references/integration.md)** - Ray Train, PyTorch, TensorFlow
## Resources
- **Docs**: https://docs.ray.io/en/latest/data/data.html
- **GitHub**: https://github.com/ray-project/ray ⭐ 36,000+
- **Version**: Ray 2.40.0+
- **Examples**: https://docs.ray.io/en/latest/data/examples/overview.html
More from Orchestra-Research/AI-Research-SKILLs
- academic-plottingGenerates publication-quality figures for ML papers from research context. Given a paper section or description, extracts system components and relationships to generate architecture diagrams via Gemini. Given experiment results or data, auto-selects chart type and generates data-driven figures via matplotlib/seaborn. Use when creating any figure for a conference paper.
- ara-compilerCompiles any research input — PDF papers, GitHub repositories, experiment logs, code directories, or raw notes — into a complete Agent-Native Research Artifact (ARA) with cognitive layer (claims, concepts, heuristics), physical layer (configs, code stubs), exploration graph, and grounded evidence. Use when ingesting a paper or codebase into a structured, machine-executable knowledge package, building an ARA from scratch, or converting research outputs into a falsifiable, agent-traversable form.
- ara-research-managerRecords research provenance as a post-task epilogue, scanning conversation history at the end of a coding or research session to extract decisions, experiments, dead ends, claims, heuristics, and pivots, and writing them into the ara/ directory with user-vs-AI provenance tags. Use as a session epilogue — never during execution — to maintain a faithful, auditable trace of how a research project actually evolved.
- ara-rigor-reviewerPerforms ARA Seal Level 2 semantic epistemic review on Agent-Native Research Artifacts, scoring six dimensions (evidence relevance, falsifiability, scope calibration, argument coherence, exploration integrity, methodological rigor) and producing a constructive, severity-ranked report with a Strong Accept-to-Reject recommendation. Use after Level 1 structural validation passes, when an ARA needs an objective epistemic critique before publication or release.
- autogpt-agentsAutonomous AI agent platform for building and deploying continuous agents. Use when creating visual workflow agents, deploying persistent autonomous agents, or building complex multi-step AI automation systems.
- autoresearchOrchestrates end-to-end autonomous AI research projects using a two-loop architecture. The inner loop runs rapid experiment iterations with clear optimization targets. The outer loop synthesizes results, identifies patterns, and steers research direction. Routes to domain-specific skills for execution, supports continuous agent operation via Claude Code /loop and OpenClaw heartbeat, and produces research presentations and papers. Use when starting a research project, running autonomous experiments, or managing a multi-hypothesis research effort.
- awq-quantizationActivation-aware weight quantization for 4-bit LLM compression with 3x speedup and minimal accuracy loss. Use when deploying large models (7B-70B) on limited GPU memory, when you need faster inference than GPTQ with better accuracy preservation, or for instruction-tuned and multimodal models. MLSys 2024 Best Paper Award winner.
- blip-2-vision-languageVision-language pre-training framework bridging frozen image encoders and LLMs. Use when you need image captioning, visual question answering, image-text retrieval, or multimodal chat with state-of-the-art zero-shot performance.
- brainstorming-research-ideasGuides researchers through structured ideation frameworks to discover high-impact research directions. Use when exploring new problem spaces, pivoting between projects, or seeking novel angles on existing work.
- constitutional-aiAnthropic's method for training harmless AI through self-improvement. Two-phase approach - supervised learning with self-critique/revision, then RLAIF (RL from AI Feedback). Use for safety alignment, reducing harmful outputs without human labels. Powers Claude's safety system.