python-async-patterns
$
npx mdskill add TheBushidoCollective/han/python-async-patternsExecute non-blocking I/O and manage concurrent tasks efficiently.
- Handles I/O-bound workloads requiring simultaneous network requests.
- Integrates with Bash and Read tools for system-level operations.
- Decides execution flow by leveraging asyncio.gather and create_task.
- Delivers results through structured async functions and event loops.
SKILL.md
.github/skills/python-async-patternsView on GitHub ↗
---
name: python-async-patterns
user-invocable: false
description: Master Python asynchronous programming with asyncio, async/await,
and concurrent.futures. Use for async code and concurrency patterns.
allowed-tools:
- Bash
- Read
---
# Python Async Patterns
Master asynchronous programming in Python using asyncio, async/await syntax,
and concurrent execution patterns for I/O-bound and CPU-bound tasks.
## Basic Async/Await
**Core async syntax:**
```python
import asyncio
# Define async function with async def
async def fetch_data(url: str) -> str:
print(f"Fetching {url}...")
await asyncio.sleep(1) # Simulate I/O operation
return f"Data from {url}"
# Call async function
async def main() -> None:
result = await fetch_data("https://api.example.com")
print(result)
# Run async function
asyncio.run(main())
```
**Multiple concurrent operations:**
```python
import asyncio
async def fetch_url(url: str) -> str:
await asyncio.sleep(1)
return f"Content from {url}"
async def main() -> None:
# Run concurrently with gather
results = await asyncio.gather(
fetch_url("https://example.com/1"),
fetch_url("https://example.com/2"),
fetch_url("https://example.com/3")
)
for result in results:
print(result)
asyncio.run(main())
```
## asyncio.create_task
**Creating and managing tasks:**
```python
import asyncio
async def process_item(item: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"Processed {item}"
async def main() -> None:
# Create tasks for concurrent execution
task1 = asyncio.create_task(process_item("A", 2.0))
task2 = asyncio.create_task(process_item("B", 1.0))
task3 = asyncio.create_task(process_item("C", 1.5))
# Do other work while tasks run
print("Tasks started")
# Wait for tasks to complete
result1 = await task1
result2 = await task2
result3 = await task3
print(result1, result2, result3)
asyncio.run(main())
```
**Task with name and context:**
```python
import asyncio
async def background_task(name: str) -> None:
print(f"Task {name} starting")
await asyncio.sleep(2)
print(f"Task {name} completed")
async def main() -> None:
# Create named task
task = asyncio.create_task(
background_task("worker"),
name="background-worker"
)
# Check task status
print(f"Task name: {task.get_name()}")
print(f"Task done: {task.done()}")
await task
asyncio.run(main())
```
## asyncio.gather vs asyncio.wait
**Using gather for results:**
```python
import asyncio
async def fetch(n: int) -> int:
await asyncio.sleep(1)
return n * 2
async def main() -> None:
# gather returns results in order
results = await asyncio.gather(
fetch(1),
fetch(2),
fetch(3)
)
print(results) # [2, 4, 6]
# Return exceptions instead of raising
results = await asyncio.gather(
fetch(1),
fetch(2),
return_exceptions=True
)
asyncio.run(main())
```
**Using wait for more control:**
```python
import asyncio
async def worker(n: int) -> int:
await asyncio.sleep(n)
return n
async def main() -> None:
tasks = [
asyncio.create_task(worker(1)),
asyncio.create_task(worker(2)),
asyncio.create_task(worker(3))
]
# Wait for first task to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"Done: {len(done)}, Pending: {len(pending)}")
# Cancel pending tasks
for task in pending:
task.cancel()
# Wait for all with timeout
done, pending = await asyncio.wait(
tasks,
timeout=2.0
)
asyncio.run(main())
```
## Async Context Managers
**Creating async context managers:**
```python
import asyncio
from typing import AsyncIterator
class AsyncResource:
async def __aenter__(self) -> "AsyncResource":
print("Acquiring resource")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
print("Releasing resource")
await asyncio.sleep(0.1)
async def query(self) -> str:
return "data"
async def main() -> None:
async with AsyncResource() as resource:
result = await resource.query()
print(result)
asyncio.run(main())
```
**Using asynccontextmanager decorator:**
```python
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def get_connection(url: str) -> AsyncIterator[str]:
# Setup
print(f"Connecting to {url}")
await asyncio.sleep(0.1)
conn = f"connection-{url}"
try:
yield conn
finally:
# Teardown
print(f"Closing connection to {url}")
await asyncio.sleep(0.1)
async def main() -> None:
async with get_connection("localhost") as conn:
print(f"Using {conn}")
asyncio.run(main())
```
## Async Iterators
**Creating async iterators:**
```python
import asyncio
from typing import AsyncIterator
class AsyncRange:
def __init__(self, count: int) -> None:
self.count = count
def __aiter__(self) -> AsyncIterator[int]:
return self
async def __anext__(self) -> int:
if self.count <= 0:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.count -= 1
return self.count
async def main() -> None:
async for i in AsyncRange(5):
print(i)
asyncio.run(main())
```
**Async generator functions:**
```python
import asyncio
from typing import AsyncIterator
async def async_range(count: int) -> AsyncIterator[int]:
for i in range(count):
await asyncio.sleep(0.1)
yield i
async def fetch_pages(urls: list[str]) -> AsyncIterator[str]:
for url in urls:
await asyncio.sleep(0.5)
yield f"Page content from {url}"
async def main() -> None:
async for num in async_range(5):
print(num)
urls = ["url1", "url2", "url3"]
async for page in fetch_pages(urls):
print(page)
asyncio.run(main())
```
## Async Queues
**Producer-consumer pattern with Queue:**
```python
import asyncio
from asyncio import Queue
async def producer(queue: Queue[int], n: int) -> None:
for i in range(n):
await asyncio.sleep(0.1)
await queue.put(i)
print(f"Produced {i}")
await queue.put(None) # Sentinel value
async def consumer(queue: Queue[int], name: str) -> None:
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(0.2)
print(f"Consumer {name} processed {item}")
queue.task_done()
async def main() -> None:
queue: Queue[int] = Queue(maxsize=5)
# Start producer and consumers
prod = asyncio.create_task(producer(queue, 10))
cons1 = asyncio.create_task(consumer(queue, "A"))
cons2 = asyncio.create_task(consumer(queue, "B"))
await prod
await queue.join() # Wait for all tasks to be processed
# Signal consumers to exit
await queue.put(None)
await queue.put(None)
await cons1
await cons2
asyncio.run(main())
```
## Semaphore and Lock
**Limiting concurrent operations:**
```python
import asyncio
async def fetch_with_limit(
url: str,
semaphore: asyncio.Semaphore
) -> str:
async with semaphore:
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Data from {url}"
async def main() -> None:
# Limit to 3 concurrent operations
semaphore = asyncio.Semaphore(3)
urls = [f"https://example.com/{i}" for i in range(10)]
tasks = [
fetch_with_limit(url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
```
**Using Lock for mutual exclusion:**
```python
import asyncio
class Counter:
def __init__(self) -> None:
self.value = 0
self.lock = asyncio.Lock()
async def increment(self) -> None:
async with self.lock:
# Critical section
current = self.value
await asyncio.sleep(0.01)
self.value = current + 1
async def main() -> None:
counter = Counter()
# Run increments concurrently
await asyncio.gather(*[
counter.increment()
for _ in range(100)
])
print(f"Final value: {counter.value}") # Should be 100
asyncio.run(main())
```
## Timeouts and Cancellation
**Using timeout:**
```python
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(5)
return "completed"
async def main() -> None:
# Timeout after 2 seconds
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=2.0
)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(main())
```
**Handling cancellation:**
```python
import asyncio
async def cancellable_task() -> None:
try:
while True:
print("Working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task was cancelled")
# Cleanup
raise
async def main() -> None:
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task cancellation confirmed")
asyncio.run(main())
```
## Event Loop Management
**Direct event loop control:**
```python
import asyncio
async def task1() -> None:
print("Task 1")
async def task2() -> None:
print("Task 2")
# Create new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Schedule callbacks
loop.call_soon(lambda: print("Callback"))
# Schedule delayed callback
loop.call_later(1.0, lambda: print("Delayed"))
# Run coroutine
loop.run_until_complete(task1())
# Run multiple tasks
loop.run_until_complete(
asyncio.gather(task1(), task2())
)
finally:
loop.close()
```
## concurrent.futures
**ThreadPoolExecutor for I/O-bound tasks:**
```python
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
def blocking_io_task(n: int) -> int:
print(f"Task {n} starting")
time.sleep(2) # Blocking I/O
return n * 2
async def main() -> None:
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=3) as executor:
# Run blocking function in thread pool
tasks = [
loop.run_in_executor(executor, blocking_io_task, i)
for i in range(5)
]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
```
**ProcessPoolExecutor for CPU-bound tasks:**
```python
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_intensive_task(n: int) -> int:
# CPU-intensive computation
result = sum(i * i for i in range(n))
return result
async def main() -> None:
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=4) as executor:
# Run CPU-bound function in process pool
tasks = [
loop.run_in_executor(
executor,
cpu_intensive_task,
10_000_000
)
for _ in range(4)
]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} tasks")
asyncio.run(main())
```
## Async HTTP with aiohttp
**Making async HTTP requests:**
```python
import asyncio
import aiohttp
async def fetch_url(
session: aiohttp.ClientSession,
url: str
) -> str:
async with session.get(url) as response:
return await response.text()
async def main() -> None:
async with aiohttp.ClientSession() as session:
urls = [
"https://example.com/1",
"https://example.com/2",
"https://example.com/3"
]
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} pages")
asyncio.run(main())
```
## Error Handling
**Handling exceptions in async code:**
```python
import asyncio
async def failing_task() -> None:
await asyncio.sleep(1)
raise ValueError("Task failed")
async def main() -> None:
# Handle exception in single task
try:
await failing_task()
except ValueError as e:
print(f"Caught: {e}")
# Handle exceptions with gather
results = await asyncio.gather(
failing_task(),
failing_task(),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"Task failed: {result}")
asyncio.run(main())
```
## When to Use This Skill
Use python-async-patterns when you need to:
- Handle multiple I/O operations concurrently (API calls, database queries)
- Build async web servers or clients
- Process data streams asynchronously
- Implement producer-consumer patterns with async queues
- Run blocking I/O operations without blocking the event loop
- Create async context managers for resource management
- Implement async iterators for streaming data
- Control concurrency with semaphores and locks
- Handle timeouts and cancellation in async operations
- Mix CPU-bound and I/O-bound operations efficiently
## Best Practices
- Use asyncio.run() for the main entry point
- Create tasks with asyncio.create_task() for concurrent execution
- Use gather() when you need all results
- Use wait() when you need fine-grained control
- Always handle CancelledError in long-running tasks
- Use semaphores to limit concurrent operations
- Prefer async context managers for resource management
- Use asyncio.Queue for producer-consumer patterns
- Run blocking I/O in thread pool with run_in_executor()
- Run CPU-bound tasks in process pool
- Set appropriate timeouts for network operations
- Use structured concurrency patterns (nurseries)
## Common Pitfalls
- Forgetting to await coroutines (creates coroutine object, doesn't run)
- Blocking the event loop with CPU-intensive work
- Not handling task cancellation properly
- Using time.sleep() instead of asyncio.sleep()
- Creating too many concurrent tasks without limits
- Not closing resources properly in async context
- Mixing blocking and async code incorrectly
- Not handling exceptions in background tasks
- Forgetting to call task_done() with Queue
- Using global event loop instead of asyncio.run()
## Resources
- [asyncio Documentation](https://docs.python.org/3/library/asyncio.html)
- [aiohttp Documentation](https://docs.aiohttp.org/)
- [PEP 492 - Coroutines](https://peps.python.org/pep-0492/)
- [PEP 525 - Async Generators](https://peps.python.org/pep-0525/)
- [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html)
More from TheBushidoCollective/han
- absinthe-resolversUse when implementing GraphQL resolvers with Absinthe. Covers resolver patterns, dataloader integration, batching, and error handling.
- absinthe-schemaUse when designing GraphQL schemas with Absinthe. Covers type definitions, interfaces, unions, enums, and schema organization patterns.
- absinthe-subscriptionsUse when implementing real-time GraphQL subscriptions with Absinthe. Covers Phoenix channels, PubSub, and subscription patterns.
- act-docker-setupUse when configuring Docker environments for act, selecting runner images, managing container resources, or troubleshooting Docker-related issues with local GitHub Actions testing.
- act-local-testingUse when testing GitHub Actions workflows locally with act. Covers act CLI usage, Docker configuration, debugging workflows, and troubleshooting common issues when running workflows on your local machine.
- act-workflow-syntaxUse when creating or modifying GitHub Actions workflow files. Provides guidance on workflow syntax, triggers, jobs, steps, and expressions for creating valid GitHub Actions workflows that can be tested locally with act.
- ameba-configurationUse when configuring Ameba rules and settings for Crystal projects including .ameba.yml setup, rule management, severity levels, and code quality enforcement.
- ameba-custom-rulesUse when creating custom Ameba rules for Crystal code analysis including rule development, AST traversal, issue reporting, and rule testing.
- ameba-integrationUse when integrating Ameba into development workflows including CI/CD pipelines, pre-commit hooks, GitHub Actions, and automated code review processes.
- analyze-performanceAnalyze performance metrics and identify slow transactions in Sentry