Python asyncio and concurrent programming patterns for high-performance applications. Use when building async APIs, concurrent systems, or I/O-bound applications requiring non-blocking operations.
This skill inherits all available tools. When active, it can use any tool Claude has access to.
Expert guidance for implementing asynchronous Python applications using asyncio, concurrent programming patterns, and async/await for building high-performance, non-blocking systems.
Foundation for all async operations:
import asyncio
async def fetch_data(url: str) -> dict:
"""Fetch data asynchronously."""
await asyncio.sleep(1) # Simulate I/O
return {"url": url, "data": "result"}
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())
Key concepts:
async def defines coroutines (pausable functions)await yields control back to event loopasyncio.run() is the entry point (Python 3.7+)Execute multiple operations simultaneously:
import asyncio
from typing import List
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
"""Fetch multiple users concurrently."""
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
# Speed: Sequential = 5s, Concurrent = 0.5s for 10 users
When to use:
return_exceptions=True to handle partial failuresBackground tasks that run independently:
import asyncio
async def background_task(name: str, delay: int):
print(f"{name} started")
await asyncio.sleep(delay)
return f"Result from {name}"
async def main():
# Create tasks (starts execution immediately)
task1 = asyncio.create_task(background_task("Task 1", 2))
task2 = asyncio.create_task(background_task("Task 2", 1))
# Do other work while tasks run
print("Doing other work")
await asyncio.sleep(0.5)
# Wait for results when needed
result1, result2 = await task1, await task2
Differences:
await coroutine() - Waits immediatelyasyncio.create_task() - Starts background executiontask.cancel()Robust error handling for concurrent operations:
import asyncio
from typing import List, Optional
async def safe_operation(item_id: int) -> Optional[dict]:
try:
await asyncio.sleep(0.1)
if item_id % 3 == 0:
raise ValueError(f"Item {item_id} failed")
return {"id": item_id, "status": "success"}
except ValueError as e:
print(f"Error: {e}")
return None
async def process_items(item_ids: List[int]):
# gather with return_exceptions=True continues on errors
results = await asyncio.gather(
*[safe_operation(iid) for iid in item_ids],
return_exceptions=True
)
successful = [r for r in results if r and not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"Success: {len(successful)}, Failed: {len(failed)}")
return successful
Prevent operations from hanging indefinitely:
import asyncio
async def with_timeout():
try:
result = await asyncio.wait_for(
slow_operation(5),
timeout=2.0
)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
# Handle timeout (retry, fallback, etc.)
Proper resource management with async operations:
import asyncio
class AsyncDatabaseConnection:
"""Async database connection with automatic cleanup."""
def __init__(self, dsn: str):
self.dsn = dsn
self.connection = None
async def __aenter__(self):
print("Opening connection")
await asyncio.sleep(0.1) # Simulate connection
self.connection = {"dsn": self.dsn, "connected": True}
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.1) # Simulate cleanup
self.connection = None
async def query_database():
async with AsyncDatabaseConnection("postgresql://localhost") as conn:
# Connection automatically closed on exit
return await perform_query(conn)
Use cases:
Stream data asynchronously:
import asyncio
from typing import AsyncIterator
async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
"""Fetch paginated data lazily."""
for page in range(1, max_pages + 1):
await asyncio.sleep(0.2) # API call
yield {
"page": page,
"url": f"{url}?page={page}",
"data": [f"item_{page}_{i}" for i in range(5)]
}
async def process_stream():
async for page_data in fetch_pages("https://api.example.com", 10):
# Process each page as it arrives (memory efficient)
print(f"Processing page {page_data['page']}")
Benefits:
Coordinate work between producers and consumers:
import asyncio
from asyncio import Queue
async def producer(queue: Queue, producer_id: int, num_items: int):
for i in range(num_items):
item = f"Item-{producer_id}-{i}"
await queue.put(item)
await asyncio.sleep(0.1)
await queue.put(None) # Signal completion
async def consumer(queue: Queue, consumer_id: int):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Consumer {consumer_id} processing: {item}")
await asyncio.sleep(0.2)
queue.task_done()
async def run_pipeline():
queue = Queue(maxsize=10)
# 2 producers, 3 consumers
producers = [asyncio.create_task(producer(queue, i, 5)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
await asyncio.gather(*producers)
await queue.join() # Wait for all items processed
for c in consumers:
c.cancel()
Control concurrent operations:
import asyncio
from typing import List
async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore: # Only N operations at once
print(f"Calling {url}")
await asyncio.sleep(0.5)
return {"url": url, "status": 200}
async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [api_call(url, semaphore) for url in urls]
return await asyncio.gather(*tasks)
# Limits to 5 concurrent requests regardless of total URLs
Use cases:
Thread-safe operations in async context:
import asyncio
class AsyncCounter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
current = self.value
await asyncio.sleep(0.01) # Simulate work
self.value = current + 1
async def get_value(self) -> int:
async with self.lock:
return self.value
Synchronization primitives:
Lock: Mutual exclusionEvent: Signal between tasksCondition: Wait for conditionSemaphore: Limit concurrent accessReuse connections for efficiency:
import aiohttp
async def with_connection_pool():
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [session.get(f"https://api.example.com/item/{i}")
for i in range(50)]
return await asyncio.gather(*tasks)
Run CPU-intensive work in executor:
import asyncio
import concurrent.futures
def blocking_operation(data):
"""CPU-intensive blocking operation."""
import time
time.sleep(1)
return data * 2
async def run_in_executor(data):
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_operation, data)
return result
Common blockers to avoid:
time.sleep() - Use asyncio.sleep()aiofilesaiohttp or httpxloop.run_in_executor()Process in chunks to control memory:
async def batch_process(items: List[str], batch_size: int = 10):
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
results = await asyncio.gather(*[process_item(item) for item in batch])
print(f"Processed batch {i // batch_size + 1}")
# Wrong - returns coroutine, doesn't execute
result = async_function()
# Correct
result = await async_function()
# Wrong - blocks entire event loop
import time
async def bad():
time.sleep(1)
# Correct
async def good():
await asyncio.sleep(1)
async def cancelable_task():
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
# Cleanup resources
raise # Re-raise to propagate
# Wrong
def sync_function():
result = await async_function() # SyntaxError
# Correct
def sync_function():
result = asyncio.run(async_function())
Use pytest-asyncio for testing:
import pytest
@pytest.mark.asyncio
async def test_async_function():
result = await fetch_data("https://api.example.com")
assert result is not None
@pytest.mark.asyncio
async def test_with_timeout():
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(5), timeout=1.0)
asyncio.run() for entry point (Python 3.7+)await coroutines to execute themgather() for concurrent execution of independent tasksreturn_exceptions=True