Message broker setup patterns (Redis, RabbitMQ, SQS) for Celery including connection strings, SSL configuration, high availability, and production best practices. Use when configuring message brokers, setting up Redis/RabbitMQ/SQS, troubleshooting broker connections, implementing HA/failover, securing broker communications with SSL, or when user mentions broker setup, connection issues, sentinel, quorum queues, or AWS SQS integration.
Limited to specific tools
Additional assets for this skill
This skill is limited to using the following tools:
README.mdexamples/rabbitmq-ha.mdexamples/redis-sentinel.mdexamples/sqs-setup.mdscripts/setup-rabbitmq.shscripts/setup-redis.shscripts/test-broker-connection.shtemplates/connection-strings.envtemplates/rabbitmq-config.pytemplates/redis-config.pytemplates/sqs-config.pytemplates/ssl-config.pyPurpose: Comprehensive message broker configuration for Celery with production-ready patterns for Redis, RabbitMQ, and Amazon SQS.
Activation Triggers:
Key Resources:
templates/redis-config.py - Production Redis configurationtemplates/rabbitmq-config.py - RabbitMQ with quorum queuestemplates/sqs-config.py - AWS SQS with IAM rolestemplates/connection-strings.env - Connection string formatstemplates/ssl-config.py - SSL/TLS configurationscripts/test-broker-connection.sh - Connection testingscripts/setup-redis.sh - Redis installation and configurationscripts/setup-rabbitmq.sh - RabbitMQ cluster setupexamples/redis-sentinel.md - High availability with Sentinelexamples/rabbitmq-ha.md - RabbitMQ clustering and quorum queuesexamples/sqs-setup.md - Complete AWS SQS integration| Feature | Redis | RabbitMQ | SQS |
|---|---|---|---|
| Performance | Excellent (small msgs) | Very Good | Good |
| Reliability | Good (with Sentinel) | Excellent (quorum) | Excellent |
| Monitoring | Yes | Yes | Limited |
| Remote Control | Yes | Yes | No |
| Management | Manual/Cloud | Manual | Fully Managed |
| Cost | Server/Cloud | Server/Cloud | Pay-per-use |
| Best For | Speed, simple setup | Reliability, features | AWS, serverless |
Choose Redis when:
Choose RabbitMQ when:
Choose SQS when:
# Install dependencies
pip install "celery[redis]"
# Use template
cp templates/redis-config.py celeryconfig.py
# Configure environment
cp templates/connection-strings.env .env
# Edit .env with actual values
Key settings:
# templates/redis-config.py
broker_url = 'redis://:password@localhost:6379/0'
broker_transport_options = {
'visibility_timeout': 3600,
'retry_on_timeout': True,
'max_connections': 50,
}
CRITICAL: Set maxmemory-policy noeviction in Redis config:
# Run setup script
./scripts/setup-redis.sh --install --configure
# Or manually
redis-cli CONFIG SET maxmemory-policy noeviction
Why: Prevents Redis from evicting task data, which would cause task loss.
# See comprehensive guide
cat examples/redis-sentinel.md
# Quick setup
docker-compose -f examples/docker-compose.sentinel.yml up -d
# Configure Celery for Sentinel
CELERY_BROKER_URL='sentinel://host1:26379;host2:26379;host3:26379/0'
Connection string format:
# Sentinel URL
broker_url = 'sentinel://sentinel1:26379;sentinel2:26379/0'
broker_transport_options = {
'master_name': 'mymaster',
'sentinel_kwargs': {'password': 'sentinel_password'},
}
# Install dependencies
pip install "celery[amqp]"
# Use template
cp templates/rabbitmq-config.py celeryconfig.py
# Run setup script
./scripts/setup-rabbitmq.sh --install --configure
Key settings:
# templates/rabbitmq-config.py
broker_url = 'amqp://user:password@localhost:5672/vhost'
# CRITICAL for quorum queues
broker_transport_options = {
'confirm_publish': True, # Required!
}
Provides:
from kombu import Queue
task_queues = (
Queue(
'default',
queue_arguments={
'x-queue-type': 'quorum',
'x-delivery-limit': 3,
}
),
)
# See comprehensive guide
cat examples/rabbitmq-ha.md
# Docker cluster setup
docker-compose -f examples/docker-compose.rabbitmq-cluster.yml up -d
# Verify cluster
docker exec rabbitmq-1 rabbitmqctl cluster_status
HAProxy load balancing:
# Distribute connections across cluster
broker_url = 'amqp://user:password@haproxy:5670/vhost'
# Create IAM policy and user
aws iam create-policy \
--policy-name CelerySQSPolicy \
--policy-document file://examples/celery-sqs-policy.json
# Or use IAM role (recommended for EC2/ECS)
# See examples/sqs-setup.md for complete guide
# Install dependencies
pip install "celery[sqs]"
# Use template
cp templates/sqs-config.py celeryconfig.py
With IAM role (recommended):
# No credentials needed
broker_url = 'sqs://'
broker_transport_options = {
'region': 'us-east-1',
'visibility_timeout': 3600,
'polling_interval': 1,
'wait_time_seconds': 10, # Long polling
}
With explicit credentials:
broker_url = 'sqs://access_key:secret_key@'
For ordered task processing:
task_queues = (
Queue(
'celery-default.fifo',
queue_arguments={
'FifoQueue': 'true',
'ContentBasedDeduplication': 'true',
}
),
)
# Send with message group ID
task.apply_async(
args=[data],
properties={'MessageGroupId': 'user-123'}
)
SQS doesn't support results - use S3 or DynamoDB:
# Option 1: S3
result_backend = 's3://my-bucket/celery-results/'
# Option 2: DynamoDB
result_backend = 'dynamodb://'
result_backend_transport_options = {
'table_name': 'celery-results',
}
# Option 3: Redis (hybrid)
result_backend = 'redis://redis.example.com:6379/0'
# Use templates/ssl-config.py
from templates.ssl_config import app
# Or manually
broker_url = 'rediss://password@host:6380/0' # Note: rediss://
broker_use_ssl = {
'ssl_cert_reqs': ssl.CERT_REQUIRED,
'ssl_ca_certs': '/path/to/ca.pem',
'ssl_certfile': '/path/to/client-cert.pem',
'ssl_keyfile': '/path/to/client-key.pem',
}
broker_url = 'amqps://user:password@host:5671/vhost' # Note: amqps://
broker_use_ssl = {
'ssl_cert_reqs': ssl.CERT_REQUIRED,
'ssl_ca_certs': '/path/to/ca.pem',
'ssl_certfile': '/path/to/client-cert.pem',
'ssl_keyfile': '/path/to/client-key.pem',
}
# .env
BROKER_SSL_ENABLED=true
BROKER_SSL_CERT=/path/to/client-cert.pem
BROKER_SSL_KEY=/path/to/client-key.pem
BROKER_SSL_CA=/path/to/ca-cert.pem
BROKER_SSL_VERIFY_MODE=CERT_REQUIRED
# Test any broker type
./scripts/test-broker-connection.sh redis
./scripts/test-broker-connection.sh rabbitmq
./scripts/test-broker-connection.sh sqs
# Check specific features
./scripts/test-broker-connection.sh redis --ssl
Script checks:
from celery import Celery
app = Celery(broker='redis://localhost:6379/0')
# Test connection
try:
with app.connection() as conn:
conn.ensure_connection(max_retries=3, timeout=5)
print("✅ Connection successful")
except Exception as e:
print(f"❌ Connection failed: {e}")
Problem: Redis evicting task data Solution:
./scripts/setup-redis.sh --configure
# Or manually:
redis-cli CONFIG SET maxmemory-policy noeviction
Problem: Queue doesn't exist or wrong vhost Solution:
# Check vhost
rabbitmqctl list_vhosts
# Check permissions
rabbitmqctl list_permissions -p /celery_vhost
Problem: IAM permissions insufficient Solution:
# Verify IAM policy includes all required actions
# See examples/sqs-setup.md for complete policy
# Test credentials
aws sts get-caller-identity
aws sqs list-queues --queue-name-prefix celery-
Problem: Network/firewall blocking connection Solution:
# Test network connectivity
telnet broker-host 6379 # Redis
telnet broker-host 5672 # RabbitMQ
# Check firewall rules
# Verify security groups (AWS)
# Check iptables rules
broker_transport_options = {
'visibility_timeout': 3600,
'max_connections': 100, # Increase for high concurrency
'socket_timeout': 5.0,
'socket_keepalive': True,
'health_check_interval': 30,
}
# Worker settings
worker_prefetch_multiplier = 4 # Prefetch 4x concurrency
worker_max_tasks_per_child = 1000
broker_transport_options = {
'confirm_publish': True,
'max_retries': 3,
}
# Use quorum queues for reliability
# Adjust prefetch for throughput
worker_prefetch_multiplier = 4
# Disable QoS for maximum speed (classic queues only)
# Note: Not compatible with worker autoscaling
# Reduce API calls (costs)
broker_transport_options = {
'polling_interval': 5, # Poll less frequently
'wait_time_seconds': 20, # Max long polling
}
# CRITICAL: Prevent visibility timeout
worker_prefetch_multiplier = 1 # Must be 1 for SQS
Redis:
INFO clientsINFO memoryDBSIZEINFO statsRabbitMQ:
rabbitmqctl list_queuesrabbitmqctl statusSQS:
ApproximateNumberOfMessagesVisible: Queue backlogNumberOfMessagesSent: Task creation rateNumberOfMessagesReceived: Task completion rateNumberOfEmptyReceives: Polling efficiency#!/bin/bash
# health-check.sh
BROKER_TYPE="${1:-redis}"
case "$BROKER_TYPE" in
redis)
redis-cli PING || exit 1
;;
rabbitmq)
rabbitmqctl status || exit 1
;;
sqs)
aws sqs list-queues --queue-name-prefix celery- || exit 1
;;
esac
echo "✅ Broker healthy"
requirepassTemplates:
redis-config.py - Production Redis configurationrabbitmq-config.py - RabbitMQ with quorum queuessqs-config.py - AWS SQS with IAMconnection-strings.env - All connection formatsssl-config.py - SSL/TLS configurationScripts:
test-broker-connection.sh - Test any brokersetup-redis.sh - Redis installation and configsetup-rabbitmq.sh - RabbitMQ cluster setupExamples:
redis-sentinel.md - High availability setuprabbitmq-ha.md - Clustering and quorum queuessqs-setup.md - Complete AWS integrationDocumentation:
Version: 1.0.0 Celery Compatibility: 5.0+ Supported Brokers: Redis 6+, RabbitMQ 3.8+, Amazon SQS
This skill follows strict security rules:
.gitignore protection documented