From harness-claude
Sets up message queues (BullMQ/Redis or RabbitMQ) with competing consumers and dead letter queues for reliable async delivery, background jobs, and decoupled processing.
How this skill is triggered — by the user, by Claude, or both
Slash command
/harness-claude:events-message-queueThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
> Use message queues for reliable async delivery with competing consumers and dead letter queues.
Use message queues for reliable async delivery with competing consumers and dead letter queues.
BullMQ (Redis-backed queue — recommended for Node.js):
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
const connection = new Redis({ host: 'localhost', port: 6379 });
// Define job data types
interface SendEmailJobData {
to: string;
subject: string;
body: string;
templateId?: string;
}
// Producer — enqueue jobs
const emailQueue = new Queue<SendEmailJobData>('email', { connection });
async function scheduleWelcomeEmail(userId: string, email: string): Promise<void> {
await emailQueue.add(
'welcome',
{ to: email, subject: 'Welcome!', body: 'Thanks for joining.', templateId: 'welcome-v2' },
{
delay: 5_000, // wait 5s before processing
attempts: 3, // retry up to 3 times
backoff: { type: 'exponential', delay: 2_000 }, // 2s, 4s, 8s
removeOnComplete: { count: 100 }, // keep last 100 completed jobs
removeOnFail: { count: 50 }, // keep last 50 failed jobs
}
);
}
// Consumer — process jobs
const emailWorker = new Worker<SendEmailJobData>(
'email',
async (job: Job<SendEmailJobData>) => {
console.log(`Processing job ${job.id}: send email to ${job.data.to}`);
await sendEmail(job.data.to, job.data.subject, job.data.body);
// Returning from the processor marks the job complete
},
{
connection,
concurrency: 5, // process 5 jobs in parallel
}
);
emailWorker.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
emailWorker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed after all retries:`, err.message);
// At this point, job moves to the failed set (acts as DLQ)
});
RabbitMQ with amqplib:
import amqp from 'amqplib';
const QUEUE = 'order.processing';
const DLQ = 'order.processing.dlq';
async function setupQueue(): Promise<void> {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
// Dead letter queue
await channel.assertQueue(DLQ, { durable: true });
// Main queue with DLQ routing
await channel.assertQueue(QUEUE, {
durable: true,
arguments: {
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': DLQ,
'x-message-ttl': 3_600_000, // messages expire after 1h
},
});
channel.prefetch(1); // process one message at a time per consumer
return channel;
}
// Producer
async function publishOrder(orderId: string, amount: number): Promise<void> {
const channel = await setupQueue();
const message = JSON.stringify({ orderId, amount, timestamp: new Date() });
channel.sendToQueue(QUEUE, Buffer.from(message), { persistent: true });
}
// Consumer
async function startConsumer(): Promise<void> {
const channel = await setupQueue();
await channel.consume(QUEUE, async (msg) => {
if (!msg) return;
try {
const data = JSON.parse(msg.content.toString());
await processOrder(data.orderId, data.amount);
channel.ack(msg); // acknowledge on success
} catch (err) {
console.error('Processing failed:', err);
// nack with requeue=false — sends to DLQ after retries exhausted
channel.nack(msg, false, false);
}
});
}
Competing consumers pattern:
// Run multiple workers — each message processed by exactly ONE worker
// Scale by adding more worker instances
// Worker 1 (process 1)
const worker1 = new Worker('orders', processOrder, { connection, concurrency: 3 });
// Worker 2 (process 2 or separate machine)
const worker2 = new Worker('orders', processOrder, { connection, concurrency: 3 });
// BullMQ guarantees only one worker processes each job
Delivery guarantees:
Dead letter queue strategy:
Anti-patterns:
Monitoring queues:
// BullMQ — check queue health
const counts = await emailQueue.getJobCounts('waiting', 'active', 'completed', 'failed', 'delayed');
console.log(counts);
// Expose as /metrics endpoint for Prometheus
microservices.io/patterns/communication-style/messaging.html
npx claudepluginhub intense-visions/harness-engineering --plugin harness-claudeGuides BullMQ queue implementation with Redis, covering job scheduling, rate limiting, worker patterns, and monitoring for Node.js/TypeScript apps.
Guides message queue and job processing setup with Kafka, RabbitMQ, SQS, BullMQ, Celery, Sidekiq. Covers architecture, retries, DLQs, idempotency, priorities, backpressure, and scaling.
Sets up Cloudflare Queues for producers/consumers, batch processing, retries, dead letter queues, and resolves errors like timeouts, retries, and backlogs.