Build async message queues with Cloudflare Queues for background processing. Use when: handling async tasks, batch processing, implementing retries, configuring dead letter queues, managing consumer concurrency, or troubleshooting queue timeout, batch retry, message loss, or throughput exceeded.
Inherits all available tools
Additional assets for this skill
This skill inherits all available tools. When active, it can use any tool Claude has access to.
README.mdreferences/best-practices.mdreferences/consumer-api.mdreferences/producer-api.mdreferences/wrangler-commands.mdrules/cloudflare-queues.mdtemplates/queues-consumer-basic.tstemplates/queues-consumer-explicit-ack.tstemplates/queues-dlq-pattern.tstemplates/queues-producer.tstemplates/queues-retry-with-delay.tstemplates/wrangler-queues-config.jsoncStatus: Production Ready ✅ Last Updated: 2025-11-24 Dependencies: cloudflare-worker-base (for Worker setup) Latest Versions: wrangler@4.50.0, @cloudflare/workers-types@4.20251121.0
Recent Updates (2025):
# 1. Create queue
npx wrangler queues create my-queue
# 2. Add producer binding to wrangler.jsonc
# { "queues": { "producers": [{ "binding": "MY_QUEUE", "queue": "my-queue" }] } }
# 3. Send message from Worker
await env.MY_QUEUE.send({ userId: '123', action: 'process-order' });
# 4. Add consumer binding to wrangler.jsonc
# { "queues": { "consumers": [{ "queue": "my-queue", "max_batch_size": 10 }] } }
# 5. Process messages
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
await processMessage(message.body);
message.ack(); // Explicit acknowledgement
}
}
};
# 6. Deploy and test
npx wrangler deploy
npx wrangler tail my-consumer
// Send single message
await env.MY_QUEUE.send({ userId: '123', action: 'send-email' });
// Send with delay (max 12 hours)
await env.MY_QUEUE.send({ action: 'reminder' }, { delaySeconds: 600 });
// Send batch (max 100 messages or 256 KB)
await env.MY_QUEUE.sendBatch([
{ body: { userId: '1' } },
{ body: { userId: '2' } },
]);
Critical Limits:
export default {
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise<void> {
for (const message of batch.messages) {
// message.id - unique UUID
// message.timestamp - Date when sent
// message.body - your content
// message.attempts - retry count (starts at 1)
await processMessage(message.body);
message.ack(); // Explicit ack (critical for non-idempotent ops)
}
}
};
// Retry with exponential backoff
message.retry({ delaySeconds: Math.min(60 * Math.pow(2, message.attempts - 1), 3600) });
// Batch methods
batch.ackAll(); // Ack all messages
batch.retryAll(); // Retry all messages
Critical:
message.ack() - Mark success, prevents retry even if handler fails laterALWAYS use explicit ack() for: Database writes, API calls, financial transactions
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await env.DB.prepare('INSERT INTO orders (id, amount) VALUES (?, ?)')
.bind(message.body.orderId, message.body.amount).run();
message.ack(); // Only ack on success
} catch (error) {
console.error(`Failed ${message.id}:`, error);
// Don't ack - will retry
}
}
}
};
Why? Prevents duplicate writes if one message in batch fails. Failed messages retry independently.
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
try {
await fetch('https://api.example.com/process', {
method: 'POST',
body: JSON.stringify(message.body),
});
message.ack();
} catch (error) {
if (error.status === 429) {
const delaySeconds = Math.min(60 * Math.pow(2, message.attempts - 1), 3600);
message.retry({ delaySeconds });
} else {
message.retry();
}
}
}
}
};
⚠️ Without DLQ, failed messages are DELETED PERMANENTLY after max_retries
npx wrangler queues create my-dlq
wrangler.jsonc:
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_retries": 3,
"dead_letter_queue": "my-dlq" // Messages go here after 3 failed retries
}]
}
}
DLQ Consumer:
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const message of batch.messages) {
console.error('PERMANENTLY FAILED:', message.id, message.body);
await env.DB.prepare('INSERT INTO failed_messages (id, body) VALUES (?, ?)')
.bind(message.id, JSON.stringify(message.body)).run();
message.ack(); // Remove from DLQ
}
}
};
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 100, // 1-100 (default: 10)
"max_batch_timeout": 30, // 0-60s (default: 5s)
"max_retries": 5, // 0-100 (default: 3)
"retry_delay": 300, // Seconds (default: 0)
"max_concurrency": 10, // 1-250 (default: auto-scale)
"dead_letter_queue": "my-dlq" // REQUIRED for production
}]
}
}
Critical Settings:
npx wrangler queues create my-dlq# Create queue
npx wrangler queues create my-queue
npx wrangler queues create my-queue --message-retention-period-secs 1209600 # 14 days
# Manage queues
npx wrangler queues list
npx wrangler queues info my-queue
npx wrangler queues delete my-queue # ⚠️ Deletes ALL messages!
# Pause/Purge (March 2025 - NEW)
npx wrangler queues pause-delivery my-queue # Pause processing, keep receiving
npx wrangler queues resume-delivery my-queue
npx wrangler queues purge my-queue # ⚠️ Permanently deletes all messages!
# Consumer management
npx wrangler queues consumer add my-queue my-consumer-worker \
--batch-size 50 --batch-timeout 10 --message-retries 5
npx wrangler queues consumer remove my-queue my-consumer-worker
| Feature | Limit |
|---|---|
| Queues per account | 10,000 |
| Message size | 128 KB (includes ~100 bytes metadata) |
| Message retries | 100 max |
| Batch size | 1-100 messages |
| Batch timeout | 0-60 seconds |
| Messages per sendBatch | 100 (or 256 KB total) |
| Queue throughput | 5,000 messages/second per queue |
| Message retention | 4 days (default), 14 days (max) |
| Queue backlog size | 25 GB per queue |
| Concurrent consumers | 250 (push-based, auto-scale) |
| Consumer duration | 15 minutes (wall clock) |
| Consumer CPU time | 30 seconds (default), 5 minutes (max) |
| Visibility timeout | 12 hours (pull consumers) |
| Message delay | 12 hours (max) |
| API rate limit | 1200 requests / 5 minutes |
Requires Workers Paid plan ($5/month)
Operations Pricing:
What counts as an operation:
Typical message lifecycle:
Retries:
Dead Letter Queue:
Cost examples:
// ❌ Bad: Message >128 KB
await env.MY_QUEUE.send({
data: largeArray, // >128 KB
});
// ✅ Good: Check size before sending
const message = { data: largeArray };
const size = new TextEncoder().encode(JSON.stringify(message)).length;
if (size > 128000) {
// Store in R2, send reference
const key = `messages/${crypto.randomUUID()}.json`;
await env.MY_BUCKET.put(key, JSON.stringify(message));
await env.MY_QUEUE.send({ type: 'large-message', r2Key: key });
} else {
await env.MY_QUEUE.send(message);
}
// ❌ Bad: Exceeding 5000 msg/s per queue
for (let i = 0; i < 10000; i++) {
await env.MY_QUEUE.send({ id: i }); // Too fast!
}
// ✅ Good: Use sendBatch
const messages = Array.from({ length: 10000 }, (_, i) => ({
body: { id: i },
}));
// Send in batches of 100
for (let i = 0; i < messages.length; i += 100) {
await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
}
// ✅ Even better: Rate limit with delay
for (let i = 0; i < messages.length; i += 100) {
await env.MY_QUEUE.sendBatch(messages.slice(i, i + 100));
if (i + 100 < messages.length) {
await new Promise(resolve => setTimeout(resolve, 100)); // 100ms delay
}
}
// ❌ Bad: Long processing without CPU limit increase
export default {
async queue(batch: MessageBatch): Promise<void> {
for (const message of batch.messages) {
await processForMinutes(message.body); // CPU timeout!
}
},
};
// ✅ Good: Increase CPU limit in wrangler.jsonc
wrangler.jsonc:
{
"limits": {
"cpu_ms": 300000 // 5 minutes (max allowed)
}
}
// Issue: Consumer too slow, backlog growing
// ✅ Solution 1: Increase batch size
{
"queues": {
"consumers": [{
"queue": "my-queue",
"max_batch_size": 100 // Process more per invocation
}]
}
}
// ✅ Solution 2: Let concurrency auto-scale (don't set max_concurrency)
// ✅ Solution 3: Optimize consumer code
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
// Process in parallel
await Promise.all(
batch.messages.map(async (message) => {
await process(message.body);
message.ack();
})
);
},
};
Always:
dead_letter_queue in consumer config)message.ack() for non-idempotent ops (DB writes, API calls)sendBatch() for multiple messages (more efficient)60 * Math.pow(2, message.attempts - 1)max_concurrency unless upstream has rate limits)Never:
ack()Promise.all() for parallelismPossible causes:
Solution:
# Check queue info
npx wrangler queues info my-queue
# Check if delivery paused
npx wrangler queues resume-delivery my-queue
# Check consumer logs
npx wrangler tail my-consumer
Cause: Using implicit acknowledgement with non-idempotent operations
Solution: Use explicit ack()
// ✅ Explicit ack
for (const message of batch.messages) {
try {
await dbWrite(message.body);
message.ack(); // Only ack on success
} catch (error) {
console.error(`Failed: ${message.id}`);
// Don't ack - will retry
}
}
Cause: No Dead Letter Queue configured
Solution:
# Create DLQ
npx wrangler queues create my-dlq
# Add to consumer config
{
"queues": {
"consumers": [{
"queue": "my-queue",
"dead_letter_queue": "my-dlq"
}]
}
}
Possible causes:
max_concurrency set to 1Solution:
{
"queues": {
"consumers": [{
"queue": "my-queue",
// Don't set max_concurrency - let it auto-scale
"max_batch_size": 50 // Increase batch size instead
}]
}
}
Last Updated: 2025-10-21 Version: 1.0.0 Maintainer: Jeremy Dawes | jeremy@jezweb.net