Task routing and queue management patterns for Celery including priority queues, topic exchanges, worker-specific routing, and advanced queue configurations. Use when configuring task routing, managing queues, setting up priority queues, implementing worker routing, configuring topic exchanges, or when user mentions task routing, queue management, Celery routing, worker assignments, or message broker routing.
Limited to specific tools
Additional assets for this skill
This skill is limited to using the following tools:
examples/priority-queue-setup.mdexamples/topic-routing.mdexamples/worker-queue-assignment.mdscripts/test-routing.shscripts/validate-queues.shtemplates/priority-queues.pytemplates/queue-config.pytemplates/routing-rules.pytemplates/topic-exchange.pytemplates/worker-routing.pyThis skill provides comprehensive templates, scripts, and patterns for implementing advanced task routing and queue management in Celery applications, including priority queues, topic-based routing, and worker-specific queue assignments.
Effective task routing is crucial for:
This skill covers routing with RabbitMQ, Redis, and custom broker configurations.
Script: scripts/test-routing.sh <config-file>
Purpose: Validates routing configuration and tests queue connectivity
Checks:
Usage:
# Test routing configuration
./scripts/test-routing.sh ./celery_config.py
# Test with custom broker URL
BROKER_URL=amqp://user:password@localhost:5672// ./scripts/test-routing.sh ./celery_config.py
# Verbose output
VERBOSE=1 ./scripts/test-routing.sh ./celery_config.py
Exit Codes:
0: All routing tests passed1: Configuration errors detected2: Broker connection failedScript: scripts/validate-queues.sh <project-dir>
Purpose: Validates queue setup across application code
Checks:
Usage:
# Validate current project
./scripts/validate-queues.sh .
# Validate specific directory
./scripts/validate-queues.sh /path/to/celery-app
# Generate detailed report
REPORT=1 ./scripts/validate-queues.sh . > queue-validation-report.md
Exit Codes:
0: Validation passed1: Validation failed (must fix issues)Template: templates/queue-config.py
Features:
Usage:
from celery import Celery
from templates.queue_config import CELERY_ROUTES, CELERY_QUEUES
app = Celery('myapp')
app.conf.task_routes = CELERY_ROUTES
app.conf.task_queues = CELERY_QUEUES
Configuration Example:
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('high_priority', Exchange('default'), routing_key='high'),
Queue('low_priority', Exchange('default'), routing_key='low'),
Queue('emails', Exchange('emails'), routing_key='email.*'),
Queue('reports', Exchange('reports'), routing_key='report.*'),
)
CELERY_ROUTES = {
'myapp.tasks.send_email': {'queue': 'emails', 'routing_key': 'email.send'},
'myapp.tasks.generate_report': {'queue': 'reports', 'routing_key': 'report.generate'},
'myapp.tasks.urgent_task': {'queue': 'high_priority', 'priority': 9},
}
Template: templates/routing-rules.py
Features:
Key Functions:
def route_task(name, args, kwargs, options, task=None, **kw):
"""
Dynamic routing based on task name or arguments
"""
if name.startswith('urgent.'):
return {'queue': 'high_priority', 'priority': 9}
if 'priority' in kwargs and kwargs['priority'] == 'high':
return {'queue': 'high_priority'}
if name.startswith('email.'):
return {'queue': 'emails', 'exchange': 'emails'}
return {'queue': 'default'}
app.conf.task_routes = (route_task,)
Template: templates/priority-queues.py
Features:
Priority Levels:
# Priority queue configuration
CELERY_QUEUES = (
Queue('critical', Exchange('tasks'), routing_key='critical',
queue_arguments={'x-max-priority': 10}),
Queue('high', Exchange('tasks'), routing_key='high',
queue_arguments={'x-max-priority': 10}),
Queue('normal', Exchange('tasks'), routing_key='normal',
queue_arguments={'x-max-priority': 10}),
Queue('low', Exchange('tasks'), routing_key='low',
queue_arguments={'x-max-priority': 10}),
)
# Task priority mapping
PRIORITY_LEVELS = {
'critical': 10, # Highest priority
'high': 7,
'normal': 5,
'low': 2,
}
# Apply priority to task
@app.task(priority=PRIORITY_LEVELS['high'])
def urgent_processing():
pass
Template: templates/topic-exchange.py
Features:
Topic Patterns:
from kombu import Exchange, Queue
# Topic exchange setup
task_exchange = Exchange('tasks', type='topic', durable=True)
CELERY_QUEUES = (
# Match specific patterns
Queue('user.notifications', exchange=task_exchange,
routing_key='user.notification.*'),
# Match all email types
Queue('emails', exchange=task_exchange,
routing_key='email.#'),
# Match processing tasks
Queue('processing', exchange=task_exchange,
routing_key='*.processing.*'),
# Match all reports
Queue('reports', exchange=task_exchange,
routing_key='report.*'),
)
# Routing configuration
CELERY_ROUTES = {
'myapp.tasks.send_welcome_email': {
'exchange': 'tasks',
'routing_key': 'email.welcome.send'
},
'myapp.tasks.notify_user': {
'exchange': 'tasks',
'routing_key': 'user.notification.send'
},
}
Template: templates/worker-routing.py
Features:
Worker Configuration:
# Worker pool definitions
WORKER_POOLS = {
'cpu_intensive': {
'queues': ['ml_training', 'video_processing', 'data_analysis'],
'concurrency': 4,
'prefetch_multiplier': 1,
},
'io_intensive': {
'queues': ['api_calls', 'file_uploads', 'emails'],
'concurrency': 50,
'prefetch_multiplier': 10,
},
'general': {
'queues': ['default', 'background'],
'concurrency': 10,
'prefetch_multiplier': 4,
},
}
# Start workers
# celery -A myapp worker --queues=ml_training,video_processing -c 4 -n cpu_worker@%h
# celery -A myapp worker --queues=api_calls,file_uploads -c 50 -n io_worker@%h
Example: examples/priority-queue-setup.md
Demonstrates:
Key Concepts:
x-max-priority argumentExample: examples/topic-routing.md
Demonstrates:
Routing Key Patterns:
* - Matches exactly one word# - Matches zero or more wordsemail.*.send matches email.welcome.send, email.notification.senduser.# matches user.create, user.update.profileExample: examples/worker-queue-assignment.md
Demonstrates:
Worker Types:
# CPU-intensive workers (low concurrency)
celery -A myapp worker -Q ml_training,video_processing -c 4 -n cpu@%h
# I/O-intensive workers (high concurrency)
celery -A myapp worker -Q api_calls,emails -c 50 -n io@%h
# General purpose workers
celery -A myapp worker -Q default,background -c 10 -n general@%h
email.*.send routes all email types to email queuepriority=high and region=us-eastx-max-lengthcelery worker --prefetch-multiplier=4This skill follows strict security rules:
.gitignore protection documentedamqp://user:password@localhost:5672//Never hardcode:
# ❌ WRONG
BROKER_URL = 'amqp://myuser:secretpass123@rabbitmq.example.com:5672//'
# ✅ CORRECT
import os
BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://guest:guest@localhost:5672//')
CELERY_BROKER_URL (required)CELERY_RESULT_BACKEND (optional)For advanced routing patterns, see:
examples/priority-queue-setup.md - Priority queue implementationexamples/topic-routing.md - Topic exchange patternsexamples/worker-queue-assignment.md - Worker pool strategiesx-max-priority set on queue (RabbitMQ only)Plugin: celery Version: 1.0.0 Last Updated: 2025-11-16