n8n workflow automation with Kafka integration expert. Covers Kafka trigger node, producer node, event-driven workflows, error handling, retries, and no-code/low-code event processing patterns. Activates for n8n kafka, kafka trigger, kafka producer, n8n workflows, event-driven automation, no-code kafka, workflow patterns.
Inherits all available tools
Additional assets for this skill
This skill inherits all available tools. When active, it can use any tool Claude has access to.
Expert knowledge of integrating Apache Kafka with n8n workflow automation platform for no-code/low-code event-driven processing.
Kafka Trigger Node (Event Consumer):
Kafka Producer Node (Event Publisher):
Configuration:
{
"credentials": {
"kafkaApi": {
"brokers": "localhost:9092",
"clientId": "n8n-workflow",
"ssl": false,
"sasl": {
"mechanism": "plain",
"username": "{{$env.KAFKA_USER}}",
"password": "{{$env.KAFKA_PASSWORD}}"
}
}
}
}
Activate me when you need help with:
Use Case: Process Kafka events with HTTP API enrichment
[Kafka Trigger] → [HTTP Request] → [Transform] → [Database]
↓
orders topic
↓
Get customer data
↓
Merge order + customer
↓
Save to PostgreSQL
n8n Workflow:
Kafka Trigger:
ordersorder-processorlatestHTTP Request (Enrich):
https://api.example.com/customers/{{$json.customerId}}Authorization: Bearer {{$env.API_TOKEN}}Set Node (Transform):
return {
orderId: $json.order.id,
customerId: $json.order.customerId,
customerName: $json.customer.name,
customerEmail: $json.customer.email,
total: $json.order.total,
timestamp: new Date().toISOString()
};
PostgreSQL (Save):
enriched_ordersUse Case: Single event triggers multiple downstream workflows
[Kafka Trigger] → [Switch] → [Kafka Producer] (topic: high-value-orders)
↓ ↓
orders topic └─→ [Kafka Producer] (topic: all-orders)
└─→ [Kafka Producer] (topic: analytics)
n8n Workflow:
orderstotal value
total > 1000 → high-value-orders topicall-orders topicanalytics topicUse Case: Retry failed messages, send to DLQ after 3 attempts
[Kafka Trigger] → [Try/Catch] → [Success] → [Kafka Producer] (topic: processed)
↓ ↓
input topic [Catch Error]
↓
[Increment Retry Count]
↓
[If Retry < 3]
↓ Yes
[Kafka Producer] (topic: input-retry)
↓ No
[Kafka Producer] (topic: dlq)
n8n Workflow:
input topicinput-retry topicdlq topicUse Case: Aggregate 100 events, send batch to API
[Kafka Trigger] → [Aggregate] → [HTTP Request] → [Kafka Producer]
↓ ↓
events topic Buffer 100 msgs
↓
Send batch to API
↓
Publish results
n8n Workflow:
Use Case: Stream database changes to Kafka
[Cron Trigger] → [PostgreSQL] → [Compare] → [Kafka Producer]
↓ ↓ ↓
Every 1 min Get new rows Find diffs
↓
Publish changes
n8n Workflow:
✅ DO:
Workflow Instance 1:
Consumer Group: order-processor
Partition: 0, 1, 2
Workflow Instance 2:
Consumer Group: order-processor
Partition: 3, 4, 5
❌ DON'T:
// WRONG: No consumer group (all instances get all messages!)
Consumer Group: (empty)
✅ DO:
[Kafka Trigger]
↓
[Try] → [HTTP Request] → [Success Handler]
↓
[Catch] → [Error Handler] → [Kafka DLQ]
❌ DON'T:
// WRONG: No error handling (workflow crashes on failure!)
[Kafka Trigger] → [HTTP Request] → [Database]
✅ DO:
Kafka Brokers: {{$env.KAFKA_BROKERS}}
SASL Username: {{$env.KAFKA_USER}}
SASL Password: {{$env.KAFKA_PASSWORD}}
❌ DON'T:
// WRONG: Hardcoded credentials in workflow!
Kafka Brokers: "localhost:9092"
SASL Username: "admin"
SASL Password: "admin-secret"
✅ DO:
Kafka Producer:
Topic: orders
Key: {{$json.customerId}} // Partition by customer
Message: {{$json}}
❌ DON'T:
// WRONG: No key (random partitioning!)
Kafka Producer:
Topic: orders
Message: {{$json}}
Setup Prometheus metrics export:
[Cron Trigger] → [Kafka Admin] → [Get Consumer Lag] → [Prometheus]
↓ ↓ ↓
Every 30s List consumer groups Calculate lag
↓
Push to Pushgateway
// Function Node (Calculate Backoff)
const retryCount = $json.headers?.['retry-count'] || 0;
const backoffMs = Math.min(1000 * Math.pow(2, retryCount), 60000); // Max 60 seconds
return {
retryCount: retryCount + 1,
backoffMs,
nextRetryAt: new Date(Date.now() + backoffMs).toISOString()
};
Workflow:
// Function Node (Check Failure Rate)
const failures = $json.metrics.failures || 0;
const total = $json.metrics.total || 1;
const failureRate = failures / total;
if (failureRate > 0.5) {
// Circuit open (too many failures)
return { circuitState: 'OPEN', skipProcessing: true };
}
return { circuitState: 'CLOSED', skipProcessing: false };
Workflow:
// Function Node (Deduplication)
const messageId = $json.headers?.['message-id'];
const cache = $('Redis').get(messageId);
if (cache) {
// Already processed, skip
return { skip: true, reason: 'duplicate' };
}
// Process and cache
await $('Redis').set(messageId, 'processed', { ttl: 3600 });
return { skip: false };
Workflow:
Enable batching in Kafka Trigger:
Kafka Trigger:
Batch Size: 100
Batch Timeout: 5000ms // Max wait 5 seconds
Process batch:
// Function Node (Batch Transform)
const events = $input.all();
const transformed = events.map(event => ({
id: event.json.id,
timestamp: event.json.timestamp,
processed: true
}));
return transformed;
[Kafka Trigger] → [Split in Batches] → [HTTP Request] → [Aggregate]
↓ ↓ ↓
1000 events 100 at a time Parallel API calls
↓
Combine results
Kafka Producer:
Compression: lz4 // Or gzip, snappy
Batch Size: 1000 // Larger batches = better compression
[Kafka Trigger] → [HTTP Request] → [Transform] → [Kafka Producer]
↓ ↓ ↓
Raw events Enrich from API Combine data
↓
Publish enriched
[Kafka Trigger] → [PostgreSQL Upsert] → [Kafka Producer]
↓ ↓ ↓
CDC events Update database Publish success/failure
[Kafka Trigger] → [If Critical] → [Send Email] → [Kafka Producer]
↓ ↓ ↓
Alerts severity=critical Notify admin
↓
Publish alert sent
[Kafka Trigger] → [Transform] → [Slack] → [Kafka Producer]
↓ ↓ ↓
Errors Format message Send to #alerts
↓
Publish notification
Test with Sample Data:
Test Kafka Producer:
# Consume test topic
kcat -C -b localhost:9092 -t test-output -o beginning
Test Kafka Trigger:
# Produce test message
echo '{"test": "data"}' | kcat -P -b localhost:9092 -t test-input
n8n CLI:
# Execute workflow with input
n8n execute workflow --file workflow.json --input data.json
# Export workflow
n8n export:workflow --id=123 --output=workflow.json
Symptoms: Processing slower than message arrival
Solutions:
Cause: At-least-once delivery, no deduplication
Solution: Add idempotency check:
// Check if message already processed
const messageId = $json.headers?.['message-id'];
const exists = await $('Redis').exists(messageId);
if (exists) {
return { skip: true };
}
Cause: Long-running HTTP requests
Solution: Use async patterns:
[Kafka Trigger] → [Webhook] → [Wait for Webhook] → [Process Response]
↓ ↓
Trigger job Async callback
↓
Continue workflow
Invoke me when you need n8n Kafka integration, workflow automation, or event-driven no-code patterns!