Expert knowledge of Kafka CLI tools (kcat, kcli, kaf, kafkactl). Auto-activates on keywords kcat, kafkacat, kcli, kaf, kafkactl, kafka cli, kafka command line, produce message, consume topic, list topics, kafka metadata. Provides command examples, installation guides, and tool comparisons.
Inherits all available tools
Additional assets for this skill
This skill inherits all available tools. When active, it can use any tool Claude has access to.
Comprehensive knowledge of modern Kafka CLI tools for production operations, development, and troubleshooting.
Installation:
# macOS
brew install kcat
# Ubuntu/Debian
apt-get install kafkacat
# From source
git clone https://github.com/edenhill/kcat.git
cd kcat
./configure && make && sudo make install
Core Operations:
Produce Messages:
# Simple produce
echo "Hello Kafka" | kcat -P -b localhost:9092 -t my-topic
# Produce with key (key:value format)
echo "user123:Login event" | kcat -P -b localhost:9092 -t events -K:
# Produce from file
cat events.json | kcat -P -b localhost:9092 -t events
# Produce with headers
echo "msg" | kcat -P -b localhost:9092 -t my-topic -H "source=app1" -H "version=1.0"
# Produce with compression
echo "data" | kcat -P -b localhost:9092 -t my-topic -z gzip
# Produce with acks=all
echo "critical-data" | kcat -P -b localhost:9092 -t my-topic -X acks=all
Consume Messages:
# Consume from beginning
kcat -C -b localhost:9092 -t my-topic -o beginning
# Consume from end (latest)
kcat -C -b localhost:9092 -t my-topic -o end
# Consume specific partition
kcat -C -b localhost:9092 -t my-topic -p 0 -o beginning
# Consume with consumer group
kcat -C -b localhost:9092 -G my-group my-topic
# Consume N messages and exit
kcat -C -b localhost:9092 -t my-topic -c 10
# Custom format (topic:partition:offset:key:value)
kcat -C -b localhost:9092 -t my-topic -f 'Topic: %t, Partition: %p, Offset: %o, Key: %k, Value: %s\n'
# JSON output
kcat -C -b localhost:9092 -t my-topic -J
Metadata & Admin:
# List all topics
kcat -L -b localhost:9092
# Get topic metadata (JSON)
kcat -L -b localhost:9092 -t my-topic -J
# Query topic offsets
kcat -Q -b localhost:9092 -t my-topic
# Check broker health
kcat -L -b localhost:9092 | grep "broker\|topic"
SASL/SSL Authentication:
# SASL/PLAINTEXT
kcat -b localhost:9092 \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanism=PLAIN \
-X sasl.username=admin \
-X sasl.password=admin-secret \
-L
# SASL/SSL
kcat -b localhost:9093 \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=SCRAM-SHA-256 \
-X sasl.username=admin \
-X sasl.password=admin-secret \
-X ssl.ca.location=/path/to/ca-cert \
-L
# mTLS (mutual TLS)
kcat -b localhost:9093 \
-X security.protocol=SSL \
-X ssl.ca.location=/path/to/ca-cert \
-X ssl.certificate.location=/path/to/client-cert.pem \
-X ssl.key.location=/path/to/client-key.pem \
-L
Installation:
# Install via krew (Kubernetes plugin manager)
kubectl krew install kcli
# Or download binary
curl -LO https://github.com/cswank/kcli/releases/latest/download/kcli-linux-amd64
chmod +x kcli-linux-amd64
sudo mv kcli-linux-amd64 /usr/local/bin/kcli
Kubernetes Integration:
# Connect to Kafka running in k8s
kcli --context my-cluster --namespace kafka
# Produce to topic in k8s
echo "msg" | kcli produce --topic my-topic --brokers kafka-broker:9092
# Consume from k8s Kafka
kcli consume --topic my-topic --brokers kafka-broker:9092 --from-beginning
# List topics in k8s cluster
kcli topics list --brokers kafka-broker:9092
Best For:
Installation:
# macOS
brew install kaf
# Linux (via snap)
snap install kaf
# From source
go install github.com/birdayz/kaf/cmd/kaf@latest
Interactive Features:
# Configure cluster
kaf config add-cluster local --brokers localhost:9092
# Use cluster
kaf config use-cluster local
# Interactive topic browsing (TUI)
kaf topics
# Interactive consume (arrow keys to navigate)
kaf consume my-topic
# Produce interactively
kaf produce my-topic
# Consumer group management
kaf groups
kaf group describe my-group
kaf group reset my-group --topic my-topic --offset earliest
# Schema Registry integration
kaf schemas
kaf schema get my-schema
Best For:
Installation:
# macOS
brew install deviceinsight/packages/kafkactl
# Linux
curl -L https://github.com/deviceinsight/kafkactl/releases/latest/download/kafkactl_linux_amd64 -o kafkactl
chmod +x kafkactl
sudo mv kafkactl /usr/local/bin/
# Via Docker
docker run --rm -it deviceinsight/kafkactl:latest
Advanced Operations:
# Configure context
kafkactl config add-context local --brokers localhost:9092
# Topic management
kafkactl create topic my-topic --partitions 3 --replication-factor 2
kafkactl alter topic my-topic --config retention.ms=86400000
kafkactl delete topic my-topic
# Consumer group operations
kafkactl describe consumer-group my-group
kafkactl reset consumer-group my-group --topic my-topic --offset earliest
kafkactl delete consumer-group my-group
# ACL management
kafkactl create acl --allow --principal User:alice --operation READ --topic my-topic
kafkactl list acls
# Quota management
kafkactl alter client-quota --user alice --producer-byte-rate 1048576
# Reassign partitions
kafkactl alter partition --topic my-topic --partition 0 --replicas 1,2,3
Best For:
| Feature | kcat | kcli | kaf | kafkactl |
|---|---|---|---|---|
| Installation | Easy | Medium | Easy | Easy |
| Produce | ✅ Advanced | ✅ Basic | ✅ Interactive | ✅ Basic |
| Consume | ✅ Advanced | ✅ Basic | ✅ Interactive | ✅ Basic |
| Metadata | ✅ JSON | ✅ Basic | ✅ TUI | ✅ Detailed |
| TUI | ❌ | ❌ | ✅ | ✅ Limited |
| Admin | ❌ | ❌ | ⚠️ Limited | ✅ Advanced |
| SASL/SSL | ✅ | ✅ | ✅ | ✅ |
| K8s Native | ❌ | ✅ | ❌ | ❌ |
| Schema Reg | ❌ | ❌ | ✅ | ❌ |
| ACLs | ❌ | ❌ | ❌ | ✅ |
| Quotas | ❌ | ❌ | ❌ | ✅ |
| Best For | Scripting, ops | Kubernetes | Development | Production admin |
# Using kafkactl (recommended for production)
kafkactl create topic orders \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config compression.type=lz4 \
--config min.insync.replicas=2
# Verify with kcat
kcat -L -b localhost:9092 -t orders -J | jq '.topics[0]'
# Produce failed message to DLQ
echo "failed-msg" | kcat -P -b localhost:9092 -t orders-dlq \
-H "original-topic=orders" \
-H "error=DeserializationException" \
-H "timestamp=$(date -u +%Y-%m-%dT%H:%M:%SZ)"
# Monitor DLQ
kcat -C -b localhost:9092 -t orders-dlq -f 'Headers: %h\nValue: %s\n\n'
# Using kafkactl
kafkactl describe consumer-group my-app | grep LAG
# Using kcat (via external tool like kcat-lag)
kcat -L -b localhost:9092 -J | jq '.topics[].partitions[] | select(.topic=="my-topic")'
# Using kaf (interactive)
kaf groups
# Then select group to see lag in TUI
# Produce to source cluster
echo "test" | kcat -P -b source-kafka:9092 -t replicated-topic
# Consume from target cluster
kcat -C -b target-kafka:9092 -t replicated-topic -o end -c 1
# Compare offsets
kcat -Q -b source-kafka:9092 -t replicated-topic
kcat -Q -b target-kafka:9092 -t replicated-topic
# Produce 10,000 messages with kcat
seq 1 10000 | kcat -P -b localhost:9092 -t perf-test
# Consume and measure throughput
time kcat -C -b localhost:9092 -t perf-test -c 10000 -o beginning > /dev/null
# Test with compression
seq 1 10000 | kcat -P -b localhost:9092 -t perf-test -z lz4
# Test broker connectivity
kcat -L -b localhost:9092
# Check SSL/TLS connection
openssl s_client -connect localhost:9093 -showcerts
# Verify SASL authentication
kcat -b localhost:9092 \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanism=PLAIN \
-X sasl.username=admin \
-X sasl.password=wrong-password \
-L
# Should fail with authentication error
# Check topic exists
kcat -L -b localhost:9092 | grep my-topic
# Check partition count
kcat -L -b localhost:9092 -t my-topic -J | jq '.topics[0].partition_count'
# Query all partition offsets
kcat -Q -b localhost:9092 -t my-topic
# Consume from all partitions
for i in {0..11}; do
echo "Partition $i:"
kcat -C -b localhost:9092 -t my-topic -p $i -c 1 -o end
done
# Check consumer group state
kafkactl describe consumer-group my-app
# Reset to beginning
kafkactl reset consumer-group my-app --topic my-topic --offset earliest
# Reset to specific offset
kafkactl reset consumer-group my-app --topic my-topic --partition 0 --offset 12345
# Delete consumer group (all consumers must be stopped first)
kafkactl delete consumer-group my-app
Automatic CLI Tool Detection: SpecWeave auto-detects installed CLI tools and recommends best tool for the operation:
import { CLIToolDetector } from './lib/cli/detector';
const detector = new CLIToolDetector();
const available = await detector.detectAll();
// Recommended tool for produce operation
if (available.includes('kcat')) {
console.log('Use kcat for produce (fastest)');
} else if (available.includes('kaf')) {
console.log('Use kaf for produce (interactive)');
}
SpecWeave Commands:
/sw-kafka:dev-env - Uses Docker Compose + kcat for local testing/sw-kafka:monitor-setup - Sets up kcat-based lag monitoring/sw-kafka:mcp-configure - Validates CLI tools are installed-X security.protocol=SASL_SSL-X sasl.mechanism=SCRAM-SHA-256/sw-kafka:kafka-mcp-integration - MCP server setup and configuration/sw-kafka:kafka-architecture - Cluster design and sizing