Kafka Connect integration expert. Covers source and sink connectors, JDBC, Elasticsearch, S3, Debezium CDC, SMT (Single Message Transforms), connector configuration, and data pipeline patterns. Activates for kafka connect, connectors, source connector, sink connector, jdbc connector, debezium, smt, data pipeline, cdc.
Inherits all available tools
Additional assets for this skill
This skill inherits all available tools. When active, it can use any tool Claude has access to.
Expert knowledge of Kafka Connect for building data pipelines with source and sink connectors.
Source Connectors (External System → Kafka):
Sink Connectors (Kafka → External System):
Single Message Transforms (SMTs):
Activate me when you need help with:
Use Case: Stream database table changes to Kafka
Configuration:
{
"name": "jdbc-source-users",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "postgres",
"connection.password": "password",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "postgres-",
"table.whitelist": "users,orders",
"poll.interval.ms": "5000"
}
}
Modes:
incrementing: Track by auto-increment IDtimestamp: Track by timestamp columntimestamp+incrementing: Both (most reliable)Use Case: Capture all database changes (INSERT/UPDATE/DELETE)
Configuration:
{
"name": "debezium-mysql-cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1",
"database.server.name": "mysql",
"database.include.list": "mydb",
"table.include.list": "mydb.users,mydb.orders",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.mydb"
}
}
Output Format (Debezium Envelope):
{
"before": null,
"after": {
"id": 1,
"name": "John Doe",
"email": "john@example.com"
},
"source": {
"version": "1.9.0",
"connector": "mysql",
"name": "mysql",
"ts_ms": 1620000000000,
"snapshot": "false",
"db": "mydb",
"table": "users",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 12345,
"row": 0,
"thread": null,
"query": null
},
"op": "c", // c=CREATE, u=UPDATE, d=DELETE, r=READ
"ts_ms": 1620000000000
}
Use Case: Write Kafka events to PostgreSQL
Configuration:
{
"name": "jdbc-sink-enriched-orders",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "3",
"topics": "enriched-orders",
"connection.url": "jdbc:postgresql://localhost:5432/analytics",
"connection.user": "postgres",
"connection.password": "password",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "order_id",
"table.name.format": "orders_${topic}"
}
}
Insert Modes:
insert: Append only (fails on duplicate)update: Update only (requires PK)upsert: INSERT or UPDATE (recommended)Use Case: Archive Kafka topics to S3
Configuration:
{
"name": "s3-sink-events",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "user-events,order-events",
"s3.region": "us-east-1",
"s3.bucket.name": "my-kafka-archive",
"s3.part.size": "5242880",
"flush.size": "1000",
"rotate.interval.ms": "60000",
"rotate.schedule.interval.ms": "3600000",
"timezone": "UTC",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timestamp.extractor": "Record"
}
}
Partitioning (S3 folder structure):
s3://my-kafka-archive/
topics/user-events/year=2025/month=01/day=15/hour=10/
user-events+0+0000000000.json
user-events+0+0000001000.json
topics/order-events/year=2025/month=01/day=15/hour=10/
order-events+0+0000000000.json
Use Case: Index Kafka events for search
Configuration:
{
"name": "elasticsearch-sink-logs",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "3",
"topics": "application-logs",
"connection.url": "http://localhost:9200",
"connection.username": "elastic",
"connection.password": "password",
"key.ignore": "true",
"schema.ignore": "true",
"type.name": "_doc",
"index.write.wait_for_active_shards": "1"
}
}
Use Case: Hide email/phone in Kafka topics
Configuration:
{
"transforms": "maskEmail",
"transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskEmail.fields": "email,phone"
}
Before:
{"id": 1, "name": "John", "email": "john@example.com", "phone": "555-1234"}
After:
{"id": 1, "name": "John", "email": null, "phone": null}
Use Case: Add processing timestamp to all messages
Configuration:
{
"transforms": "insertTimestamp",
"transforms.insertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTimestamp.timestamp.field": "processed_at"
}
Use Case: Route high-value orders to separate topic
Configuration:
{
"transforms": "routeByValue",
"transforms.routeByValue.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.routeByValue.regex": "(.*)",
"transforms.routeByValue.replacement": "$1-high-value",
"transforms.routeByValue.predicate": "isHighValue",
"predicates": "isHighValue",
"predicates.isHighValue.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHighValue.pattern": "orders"
}
Use Case: Flatten nested structures for JDBC sink
Configuration:
{
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
}
Before:
{
"user": {
"id": 1,
"profile": {
"name": "John",
"email": "john@example.com"
}
}
}
After:
{
"user_id": 1,
"user_profile_name": "John",
"user_profile_email": "john@example.com"
}
✅ DO:
// JDBC Sink with upsert mode
{
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "id"
}
❌ DON'T:
// WRONG: insert mode (duplicates on restart!)
{
"insert.mode": "insert"
}
# Check connector status
curl http://localhost:8083/connectors/jdbc-source-users/status
# Check task status
curl http://localhost:8083/connectors/jdbc-source-users/tasks/0/status
✅ DO:
{
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}
{
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "dlq-jdbc-sink",
"errors.deadletterqueue.context.headers.enable": "true"
}
# Create connector via REST API
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @jdbc-source.json
# Update connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/config \
-H "Content-Type: application/json" \
-d @jdbc-source.json
# List all connectors
curl http://localhost:8083/connectors
# Get connector info
curl http://localhost:8083/connectors/jdbc-source-users
# Get connector status
curl http://localhost:8083/connectors/jdbc-source-users/status
# Get connector tasks
curl http://localhost:8083/connectors/jdbc-source-users/tasks
# Pause connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/pause
# Resume connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/resume
# Restart connector
curl -X POST http://localhost:8083/connectors/jdbc-source-users/restart
# Restart task
curl -X POST http://localhost:8083/connectors/jdbc-source-users/tasks/0/restart
Symptoms: Task state = FAILED
Solutions:
docker logs connect-workercurl http://localhost:8083/connector-plugins/<class>/config/validatecurl -X POST .../tasks/0/restartError: Incompatible schema detected
Solution: Enable auto-evolution:
{
"auto.create": "true",
"auto.evolve": "true"
}
Error: Could not get JDBC connection
Solution: Increase pool size:
{
"connection.attempts": "3",
"connection.backoff.ms": "10000"
}
Invoke me when you need Kafka Connect, connectors, CDC, or data pipeline expertise!