This skill should be used when users need to create or fix Redpanda Connect pipeline configurations. Trigger when users mention "config", "pipeline", "YAML", "create a config", "fix my config", "validate my pipeline", or describe a streaming pipeline need like "read from Kafka and write to S3".
This skill inherits all available tools. When active, it can use any tool Claude has access to.
Create working, validated Redpanda Connect configurations from scratch or repair existing configurations that have issues.
This skill REQUIRES skills: component-search, bloblang-authoring.
Deliver a complete, valid YAML configuration that passes validation and meets the user's requirements. Whether starting from a description or fixing a broken config, the result must be production-ready with properly secured credentials.
Handle Two Scenarios: Creation - User provides description like "Read from Kafka on localhost:9092 topic 'events' to stdout" Repair - User provides config file path and optional error context
This skill focuses ONLY on pipeline configuration orchestration and validation.
Skill Delegation:
NEVER directly use component-search or bloblang-authoring tools.
component-search skill when it is unclear which components to use OR when you need component configuration detailsbloblang-authoring skill when creating or fixing Bloblang transformations and NEVER write Bloblang yourselfThis skill requires: rpk, rpk connect.
See the SETUP for installation instructions.
Generates YAML configuration template from component expression. Useful for quickly creating first pipeline draft.
# Usage:
rpk connect create [--small] <input>,...[/<processor>,...]/<output>,...
# Examples:
rpk connect create stdin/bloblang,awk/nats
rpk connect create file,http_server/protobuf/http_client # Multiple inputs
rpk connect create kafka_franz/stdout # Only input and output, no processors
rpk connect create --small stdin/bloblang/stdout # Minimal config, omit advanced fields
inputs/processors/outputs separated by /,--small flag omits advanced fieldsUse the component-search skill's Online Component Documentation tool to look up detailed configuration information for any Redpanda Connect component containing usage examples, field descriptions, and best practices.
Validates Redpanda Connect pipeline configurations.
# Usage:
rpk connect lint [--env-file <.env>] <pipeline.yaml>
# Examples:
rpk connect lint --env-file ./.env ./pipeline.yaml
rpk connect lint pipeline-without-secrets.yaml
pipeline.yaml)--env-file flag provides .env file for environment variable substitution0 indicates success, non-zero indicates validation failuresExecutes Redpanda Connect pipeline to test end-to-end functionality.
# Usage:
rpk connect run [--log.level DEBUG] --env-file <.env> <pipeline.yaml>
# Examples:
rpk connect run pipeline-without-secrets.yaml
rpk connect run --env-file ./.env ./pipeline.yaml # With secrets
rpk connect run --log.level DEBUG --env-file ./.env ./pipeline.yaml # With debug logging
pipeline.yaml)--env-file flag provides dotenv file for environment variable substitution--log.level DEBUG enables detailed logging for troubleshooting connection and processing issuesTest pipeline logic with stdin/stdout before connecting to real systems.
Especially useful for validating routing logic, error handling, and transformations.
Example: Content-based routing
input:
stdin: {}
pipeline:
processors:
- mapping: |
root = this
# Route based on message type
if this.type == "error" {
meta route = "dlq"
} else if this.priority == "high" {
meta route = "urgent"
} else {
meta route = "standard"
}
output:
switch:
cases:
- check: 'meta("route") == "dlq"'
output:
stdout: {}
processors:
- mapping: 'root = "DLQ: " + content().string()'
- check: 'meta("route") == "urgent"'
output:
stdout: {}
processors:
- mapping: 'root = "URGENT: " + content().string()'
- check: 'meta("route") == "standard"'
output:
stdout: {}
processors:
- mapping: 'root = "STANDARD: " + content().string()'
Test all routes:
echo '{"type":"error","msg":"failed"}' | rpk connect run test.yaml
# Output: DLQ: {"type":"error","msg":"failed"}
echo '{"priority":"high","msg":"urgent"}' | rpk connect run test.yaml
# Output: URGENT: {"priority":"high","msg":"urgent"}
echo '{"priority":"low","msg":"normal"}' | rpk connect run test.yaml
# Output: STANDARD: {"priority":"low","msg":"normal"}
Limitations:
Top-level keys:
input - Data source (required): kafka_franz, http_server, stdin, aws_s3, etcoutput - Data destination (required): kafka_franz, postgres, stdout, aws_s3, etcpipeline.processors - Transformations (optional, execute sequentially)cache_resources, rate_limit_resources - Reusable components (optional)Environment variables (required for secrets):
# Basic reference
broker: "${KAFKA_BROKER}"
# With default value
broker: "${KAFKA_BROKER:localhost:9092}"
Field type conventions:
"30s", "5m", "1h", "100ms""5MB", "1GB", "512KB"true, false (no quotes)Minimal example:
input:
redpanda:
seed_brokers: ["${KAFKA_BROKER}"]
topics: ["${TOPIC}"]
pipeline:
processors:
- mapping:
| # Bloblang transformation - use bloblang-authoring skill to create
root = this
root.timestamp = now()
output:
stdout: {}
Use Quick Pipeline Scaffolding for initial drafts.
The ./resources/recipes/ directory contains validated production patterns.
Each recipe includes:
.md) - Pattern explanation, configuration details, testing instructions, and variations.yaml) - Complete, tested pipeline referenced in the markdownBefore writing pipelines:
Online Component Documentation tool for detailed field info and examplesError Handling
dlq-basic.md - Dead letter queue for error handlingRouting
content-based-router.md - Route messages by field valuesmulticast.md - Fan-out to multiple destinationsReplication
kafka-replication.md - Cross-cluster Kafka streamingcdc-replication.md - Database change data captureCloud Storage
s3-sink-basic.md - S3 output with batchings3-sink-time-based.md - Time-partitioned S3 writess3-polling.md - Poll S3 for new filesStateful Processing
stateful-counter.md - Stateful counting with cachewindow-aggregation.md - Time-window aggregationsPerformance & Monitoring
rate-limiting.md - Throughput controlcustom-metrics.md - Prometheus metricsUnderstand requirements
./resources/recipes/ for relevant patternsDiscover components
component-search skill if unclear which components to useBuild configuration
rpk connect create input/processor/output${VAR_NAME} → document in .env.exampleAdd transformations (if needed)
bloblang-authoring skill for tested scriptspipeline.processors sectionValidate and iterate
rpk connect lintTest and iterate
rpk connect run
stdin and stdout for easier testingrpk connect runDeliver
pipeline.yaml and .env.exampleTESTING.md with only practical followup testing instructions:
Diagnose
rpk connect lint to identify errorsExplain issues
Fix minimally
Verify
Never store credentials in plain text:
${ENV_VAR} syntax in YAMLEnvironment variable files:
.env - Contains actual secret values, used at runtime with --env-file .env, NEVER commit to git.env.example - Documents required variables with placeholder values, safe to commit.env to .gitignoreWhen encountering sensitive fields (from <secret_fields> in component schema):
KAFKA_PASSWORD)${KAFKA_PASSWORD} in YAML configuration.env.example: KAFKA_PASSWORD=your_password_here.env with real value: KAFKA_PASSWORD=actual_secret_123