python-background-jobs
$
npx mdskill add wshobson/agents/python-background-jobsExecute long-running tasks asynchronously without blocking requests.
- Handles email, reports, uploads, and external service integrations.
- Depends on Celery, Redis, or cloud-native task queues.
- Decides execution based on task duration and reliability needs.
- Delivers results via job IDs and state tracking mechanisms.
SKILL.md
.github/skills/python-background-jobsView on GitHub ↗
---
name: python-background-jobs
description: Python background job patterns including task queues, workers, and event-driven architecture. Use when implementing async task processing, job queues, long-running operations, or decoupling work from request/response cycles.
---
# Python Background Jobs & Task Queues
Decouple long-running or unreliable work from request/response cycles. Return immediately to the user while background workers handle the heavy lifting asynchronously.
## When to Use This Skill
- Processing tasks that take longer than a few seconds
- Sending emails, notifications, or webhooks
- Generating reports or exporting data
- Processing uploads or media transformations
- Integrating with unreliable external services
- Building event-driven architectures
## Core Concepts
### 1. Task Queue Pattern
API accepts request, enqueues a job, returns immediately with a job ID. Workers process jobs asynchronously.
### 2. Idempotency
Tasks may be retried on failure. Design for safe re-execution.
### 3. Job State Machine
Jobs transition through states: pending → running → succeeded/failed.
### 4. At-Least-Once Delivery
Most queues guarantee at-least-once delivery. Your code must handle duplicates.
## Quick Start
This skill uses Celery for examples, a widely adopted task queue. Alternatives like RQ, Dramatiq, and cloud-native solutions (AWS SQS, GCP Tasks) are equally valid choices.
```python
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def send_email(to: str, subject: str, body: str) -> None:
# This runs in a background worker
email_client.send(to, subject, body)
# In your API handler
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
```
## Fundamental Patterns
### Pattern 1: Return Job ID Immediately
For operations exceeding a few seconds, return a job ID and process asynchronously.
```python
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
@dataclass
class Job:
id: str
status: JobStatus
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
result: dict | None = None
error: str | None = None
# API endpoint
async def start_export(request: ExportRequest) -> JobResponse:
"""Start export job and return job ID."""
job_id = str(uuid4())
# Persist job record
await jobs_repo.create(Job(
id=job_id,
status=JobStatus.PENDING,
created_at=datetime.utcnow(),
))
# Enqueue task for background processing
await task_queue.enqueue(
"export_data",
job_id=job_id,
params=request.model_dump(),
)
# Return immediately with job ID
return JobResponse(
job_id=job_id,
status="pending",
poll_url=f"/jobs/{job_id}",
)
```
### Pattern 2: Celery Task Configuration
Configure Celery tasks with proper retry and timeout settings.
```python
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
# Global configuration
app.conf.update(
task_time_limit=3600, # Hard limit: 1 hour
task_soft_time_limit=3000, # Soft limit: 50 minutes
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # Don't prefetch too many tasks
)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
)
def process_payment(self, payment_id: str) -> dict:
"""Process payment with automatic retry on transient errors."""
try:
result = payment_gateway.charge(payment_id)
return {"status": "success", "transaction_id": result.id}
except PaymentDeclinedError as e:
# Don't retry permanent failures
return {"status": "declined", "reason": str(e)}
except TransientError as e:
# Retry with exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
```
### Pattern 3: Make Tasks Idempotent
Workers may retry on crash or timeout. Design for safe re-execution.
```python
@app.task(bind=True)
def process_order(self, order_id: str) -> None:
"""Process order idempotently."""
order = orders_repo.get(order_id)
# Already processed? Return early
if order.status == OrderStatus.COMPLETED:
logger.info("Order already processed", order_id=order_id)
return
# Already in progress? Check if we should continue
if order.status == OrderStatus.PROCESSING:
# Use idempotency key to avoid double-charging
pass
# Process with idempotency key
result = payment_provider.charge(
amount=order.total,
idempotency_key=f"order-{order_id}", # Critical!
)
orders_repo.update(order_id, status=OrderStatus.COMPLETED)
```
**Idempotency Strategies:**
1. **Check-before-write**: Verify state before action
2. **Idempotency keys**: Use unique tokens with external services
3. **Upsert patterns**: `INSERT ... ON CONFLICT UPDATE`
4. **Deduplication window**: Track processed IDs for N hours
### Pattern 4: Job State Management
Persist job state transitions for visibility and debugging.
```python
class JobRepository:
"""Repository for managing job state."""
async def create(self, job: Job) -> Job:
"""Create new job record."""
await self._db.execute(
"""INSERT INTO jobs (id, status, created_at)
VALUES ($1, $2, $3)""",
job.id, job.status.value, job.created_at,
)
return job
async def update_status(
self,
job_id: str,
status: JobStatus,
**fields,
) -> None:
"""Update job status with timestamp."""
updates = {"status": status.value, **fields}
if status == JobStatus.RUNNING:
updates["started_at"] = datetime.utcnow()
elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
updates["completed_at"] = datetime.utcnow()
await self._db.execute(
"UPDATE jobs SET status = $1, ... WHERE id = $2",
updates, job_id,
)
logger.info(
"Job status updated",
job_id=job_id,
status=status.value,
)
```
## Detailed worked examples and patterns
Detailed sections (starting with `## Advanced Patterns`) live in `references/details.md`. Read that file when the navigation summary above is insufficient.
## Best Practices Summary
1. **Return immediately** - Don't block requests for long operations
2. **Persist job state** - Enable status polling and debugging
3. **Make tasks idempotent** - Safe to retry on any failure
4. **Use idempotency keys** - For external service calls
5. **Set timeouts** - Both soft and hard limits
6. **Implement DLQ** - Capture permanently failed tasks
7. **Log transitions** - Track job state changes
8. **Retry appropriately** - Exponential backoff for transient errors
9. **Don't retry permanent failures** - Validation errors, invalid credentials
10. **Monitor queue depth** - Alert on backlog growth
More from wshobson/agents
- accessibility-complianceImplement WCAG 2.2 compliant interfaces with mobile accessibility, inclusive design patterns, and assistive technology support. Use when auditing accessibility, implementing ARIA patterns, building for screen readers, or ensuring inclusive user experiences.
- airflow-dag-patternsBuild production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.
- angular-migrationMigrate from AngularJS to Angular using hybrid mode, incremental component rewriting, and dependency injection updates. Use when upgrading AngularJS applications, planning framework migrations, or modernizing legacy Angular code.
- anti-reversing-techniquesUnderstand anti-reversing, obfuscation, and protection techniques encountered during software analysis. Use this skill when analyzing malware evasion techniques, when implementing anti-debugging protections for CTF challenges, when reverse engineering packed binaries, or when building security research tools that need to detect virtualized environments.
- api-design-principlesMaster REST and GraphQL API design principles to build intuitive, scalable, and maintainable APIs that delight developers. Use when designing new APIs, reviewing API specifications, or establishing API design standards.
- architecture-decision-recordsWrite and maintain Architecture Decision Records (ADRs) following best practices for technical decision documentation. Use when documenting significant technical decisions, reviewing past architectural choices, or establishing decision processes.
- architecture-patternsImplement proven backend architecture patterns including Clean Architecture, Hexagonal Architecture, and Domain-Driven Design. Use this skill when designing clean architecture for a new microservice, when refactoring a monolith to use bounded contexts, when implementing hexagonal or onion architecture patterns, or when debugging dependency cycles between application layers.
- async-python-patternsMaster Python asyncio, concurrent programming, and async/await patterns for high-performance applications. Use when building async APIs, concurrent systems, or I/O-bound applications requiring non-blocking operations.
- attack-tree-constructionBuild comprehensive attack trees to visualize threat paths. Use when mapping attack scenarios, identifying defense gaps, or communicating security risks to stakeholders.
- auth-implementation-patternsMaster authentication and authorization patterns including JWT, OAuth2, session management, and RBAC to build secure, scalable access control systems. Use when implementing auth systems, securing APIs, or debugging security issues.