Production-ready Celery task templates with error handling, retries, rate limiting, time limits, and custom task classes. Use when creating Celery tasks, implementing retry logic, adding rate limiting, setting time limits, building custom task classes, validating task inputs with Pydantic, handling database operations, making API calls, or when user mentions task patterns, retry mechanisms, task templates, error handling, task best practices.
Limited to specific tools
Additional assets for this skill
This skill is limited to using the following tools:
examples/custom-task-classes.mdexamples/rate-limiting.mdexamples/task-with-retries.mdscripts/generate-task.shscripts/test-task.shscripts/validate-task.shtemplates/api-task.pytemplates/basic-task.pytemplates/custom-task-class.pytemplates/database-task.pytemplates/pydantic-validation.pytemplates/rate-limited-task.pytemplates/retry-task.pytemplates/time-limited-task.pyProduction-ready Celery task templates with comprehensive error handling, retry mechanisms, rate limiting, and custom behavior patterns.
This skill provides battle-tested templates and patterns for building robust Celery tasks. Each template demonstrates production-ready code with proper error handling, logging, retry logic, and security best practices.
Template: templates/basic-task.py
Simple task with standard error handling and logging.
Use when:
Features:
Template: templates/retry-task.py
Tasks with automatic retry mechanisms and exponential backoff.
Use when:
Features:
autoretry_forExample: See examples/task-with-retries.md for complete implementation guide
Template: templates/rate-limited-task.py
Tasks with rate limiting to control execution speed.
Use when:
Features:
Example: See examples/rate-limiting.md for complete guide
Template: templates/time-limited-task.py
Tasks with soft and hard time limits to prevent runaway execution.
Use when:
Features:
Template: templates/custom-task-class.py
Custom task base classes with specialized behavior.
Use when:
Features:
Example: See examples/custom-task-classes.md for comprehensive guide
Template: templates/pydantic-validation.py
Type-safe tasks with Pydantic model validation.
Use when:
Features:
Template: templates/database-task.py
Best practices for database operations in tasks.
Use when:
Features:
Template: templates/api-task.py
Best practices for external API calls in tasks.
Use when:
Features:
Usage: ./scripts/generate-task.sh <template_name> <output_file> [task_name]
Generate new Celery task from template with customizations.
Available templates:
Example:
./scripts/generate-task.sh retry-task tasks/my_api_call.py fetch_data
Usage: ./scripts/test-task.sh <task_file.py> [task_name]
Test Celery task by:
Example:
./scripts/test-task.sh templates/retry-task.py fetch_api_data
Usage: ./scripts/validate-task.sh <task_file.py>
Comprehensive validation including:
Example:
./scripts/validate-task.sh templates/api-task.py
# Use retry-task.py template
from templates.retry_task import fetch_api_data
# Queue task
result = fetch_api_data.delay('https://api.example.com/data')
# Get result
data = result.get(timeout=60)
print(data)
# Use rate-limited-task.py template
from templates.rate_limited_task import api_call_rate_limited
# Process 100 items at 10/minute rate
for i in range(100):
api_call_rate_limited.delay(f'/endpoint/{i}')
# Celery automatically enforces rate limit
# Use database-task.py template
from templates.database_task import insert_record, bulk_insert
# Single insert
result1 = insert_record.delay('users', {
'name': 'John Doe',
'email': 'john@example.com'
})
# Bulk insert
records = [
{'name': 'Jane', 'email': 'jane@example.com'},
{'name': 'Bob', 'email': 'bob@example.com'},
]
result2 = bulk_insert.delay('users', records)
# Use custom-task-class.py template
from templates.custom_task_class import monitored_task
# Automatic metrics tracking
result = monitored_task.delay(123)
# Metrics logged automatically:
# - Start time
# - Duration
# - Success/failure
# - Arguments
# Use pydantic-validation.py template
from templates.pydantic_validation import process_user
# Valid data
user_data = {
'user_id': 123,
'email': 'user@example.com',
'username': 'john_doe',
'age': 30,
'tags': ['premium']
}
result = process_user.delay(user_data)
# Invalid data returns structured error
invalid_data = {
'user_id': -1, # Invalid
'email': 'not-email', # Invalid
}
result = process_user.delay(invalid_data)
# Returns: {'status': 'error', 'errors': [...]}
# ✅ CORRECT
api_key = os.getenv('API_KEY', 'your_api_key_here')
# ❌ WRONG
api_key = 'sk-abc123xyz456'
# ✅ CORRECT
response = requests.get(url, timeout=30)
# ❌ WRONG
response = requests.get(url) # Can hang forever
# ✅ CORRECT
db.execute("SELECT * FROM users WHERE id = %s", (user_id,))
# ❌ WRONG
db.execute(f"SELECT * FROM users WHERE id = {user_id}")
@app.task(
bind=True,
autoretry_for=(RequestException,),
retry_backoff=True,
rate_limit='10/m'
)
def robust_api_call(self, url: str):
"""Retries on failure, respects rate limits."""
pass
@app.task(bind=True)
def well_logged_task(self, item_id: int):
logger.info(f"Starting task for item {item_id}")
try:
result = process(item_id)
logger.info(f"Successfully processed {item_id}")
return result
except Exception as exc:
logger.error(f"Failed to process {item_id}: {exc}")
raise
This skill follows strict security rules:
Choose basic-task.py when:
Choose retry-task.py when:
Choose rate-limited-task.py when:
Choose time-limited-task.py when:
Choose custom-task-class.py when:
Choose pydantic-validation.py when:
Choose database-task.py when:
Choose api-task.py when:
# Validate task structure
./scripts/validate-task.sh templates/retry-task.py
# Test task execution
./scripts/test-task.sh templates/retry-task.py fetch_api_data
# Start worker
celery -A tasks worker --loglevel=info
# Execute tasks
python3 templates/retry-task.py
@app.task(
bind=True,
autoretry_for=(RequestException,),
retry_backoff=True,
retry_jitter=True,
max_retries=5
)
@app.task(
rate_limit='10/m',
soft_time_limit=60,
time_limit=120
)
@app.task(
bind=True,
autoretry_for=(ValidationError, RequestException),
retry_backoff=True,
max_retries=3
)
def validated_api_task(self, request: dict):
# Validate with Pydantic
req = ApiRequest(**request)
# Make API call with retry
return make_call(req.url)
@app.task(
bind=True, # Access self (required for retry)
base=CustomTask, # Custom task class
autoretry_for=(Exception,), # Exceptions to retry
retry_backoff=True, # Exponential backoff
retry_backoff_max=600, # Max backoff seconds
retry_jitter=True, # Add randomness
max_retries=5, # Max attempts
default_retry_delay=10, # Default delay
rate_limit='10/m', # Rate limit
soft_time_limit=60, # Soft timeout (catchable)
time_limit=120, # Hard timeout (kills task)
ignore_result=False, # Store result
priority=5 # Task priority (0-9)
)
# Automatic
autoretry_for=(RequestException, Timeout)
# Manual
raise self.retry(exc=exc, countdown=60)
rate_limit='10/s' # 10 per second
rate_limit='100/m' # 100 per minute
rate_limit='1000/h' # 1000 per hour
examples/task-with-retries.md - Complete retry guideexamples/rate-limiting.md - Complete rate limiting guideexamples/custom-task-classes.md - Custom class guideAlways run validation before committing:
./scripts/validate-task.sh your-task.py
Validation checks: