Event-driven architecture patterns with event sourcing, CQRS, and message-driven communication. Use when designing distributed systems, microservices communication, or systems requiring eventual consistency and scalability.
This skill inherits all available tools. When active, it can use any tool Claude has access to.
Expert guidance for designing, implementing, and operating event-driven systems with proven patterns for event sourcing, CQRS, message brokers, saga coordination, and eventual consistency management.
Events represent facts that have occurred in the system and are immutable.
Event Characteristics:
✓ Immutable (cannot be changed after creation)
✓ Past tense naming (OrderCreated, PaymentProcessed)
✓ Self-contained (all necessary data included)
✓ Timestamped and versioned
✗ Commands (CreateOrder vs OrderCreated)
✗ Mutable state changes
✗ Missing context or correlation data
Systems achieve consistency over time rather than immediately.
Trade-off:
Services communicate through events without direct dependencies.
Operations don't block waiting for responses.
Design around what happened (events) rather than what to do (commands).
Well-Designed Event:
{
"event_id": "evt_a3f7c9b2d8e1",
"event_type": "order.created",
"event_version": "1.0",
"timestamp": "2024-01-15T10:30:00.000Z",
"source": "order-service",
"correlation_id": "corr_x1y2z3a4b5",
"causation_id": "evt_previous_event",
"data": {
"order_id": "ord_123456",
"customer_id": "cust_789012",
"total_amount": 99.99,
"currency": "USD",
"items": [
{
"product_id": "prod_abc",
"quantity": 2,
"price": 49.99
}
]
},
"metadata": {
"user_id": "user_xyz",
"tenant_id": "tenant_001",
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0..."
}
}
Key Fields:
event_id: Unique identifier for idempotencyevent_type: Semantic event name (dot notation)event_version: Schema version for evolutiontimestamp: When event occurred (ISO 8601)correlation_id: Track related events across servicescausation_id: Which event caused this onedata: Business payloadmetadata: Contextual information1. Domain Events (Business Events):
Business facts within bounded context:
- OrderCreated
- PaymentProcessed
- InventoryReserved
- CustomerRegistered
- ShipmentDelivered
2. Integration Events (Cross-Service):
Events published across service boundaries:
- Order.Created (published to event bus)
- Customer.Updated (for other services)
- Payment.Succeeded (trigger workflows)
3. Change Data Capture (CDC):
Database changes as events:
- Record inserted → RecordCreated event
- Record updated → RecordUpdated event
- Record deleted → RecordDeleted event
Tools: Debezium, Maxwell, AWS DMS
Store all state changes as a sequence of immutable events instead of current state.
Traditional CRUD:
-- Users table stores current state
CREATE TABLE users (
id UUID PRIMARY KEY,
name VARCHAR(255),
email VARCHAR(255),
status VARCHAR(50),
updated_at TIMESTAMP
);
-- Single row, state overwrites history
UPDATE users SET email = 'new@email.com' WHERE id = 'user_123';
Event Sourcing:
-- Event store holds all changes
CREATE TABLE user_events (
event_id UUID PRIMARY KEY,
aggregate_id UUID, -- user_id
event_type VARCHAR(100),
event_data JSONB,
event_version INTEGER,
timestamp TIMESTAMP,
sequence_number BIGSERIAL
);
-- Append-only, never update
INSERT INTO user_events VALUES (
'evt_001', 'user_123', 'UserCreated',
'{"name": "John", "email": "john@example.com"}', 1, NOW()
);
INSERT INTO user_events VALUES (
'evt_002', 'user_123', 'EmailChanged',
'{"old_email": "john@example.com", "new_email": "new@email.com"}', 1, NOW()
);
-- Current state = replay all events
Event Store Interface:
interface EventStore {
// Append events to stream
appendEvents(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void>;
// Read events from stream
readEvents(
streamId: string,
fromVersion?: number
): Promise<DomainEvent[]>;
// Read all events across streams
readAllEvents(
fromPosition?: number,
maxCount?: number
): Promise<DomainEvent[]>;
}
// Aggregate root reconstructs from events
class Order {
private id: string;
private status: OrderStatus;
private items: OrderItem[];
private version: number = 0;
// Replay events to rebuild state
static fromEvents(events: OrderEvent[]): Order {
const order = new Order();
for (const event of events) {
order.apply(event);
order.version++;
}
return order;
}
private apply(event: OrderEvent): void {
switch (event.type) {
case 'OrderCreated':
this.id = event.data.orderId;
this.status = 'PENDING';
this.items = event.data.items;
break;
case 'OrderPaid':
this.status = 'PAID';
break;
case 'OrderShipped':
this.status = 'SHIPPED';
break;
}
}
}
Specialized Event Stores:
General-Purpose with Event Sourcing:
Separate read (query) and write (command) models for optimal performance.
Command Side (Write Model):
User → Command → Aggregate → Event Store
↓
Event Published
↓
Read Side (Query Model):
Event Handler → Update Read DB → Query API → User
Command Side:
// Command (intent to change state)
interface CreateOrderCommand {
customerId: string;
items: OrderItem[];
}
// Command Handler (validates and executes)
class CreateOrderCommandHandler {
constructor(
private eventStore: EventStore,
private orderRepository: OrderRepository
) {}
async handle(command: CreateOrderCommand): Promise<string> {
// Business logic validation
if (command.items.length === 0) {
throw new Error('Order must have items');
}
// Create aggregate
const order = Order.create(command.customerId, command.items);
// Get events from aggregate
const events = order.getUncommittedEvents();
// Save to event store
await this.eventStore.appendEvents(
`order-${order.id}`,
events,
0 // expected version
);
return order.id;
}
}
Read Side (Projection):
// Read Model (optimized for queries)
interface OrderSummary {
orderId: string;
customerId: string;
customerName: string; // denormalized
totalAmount: number;
itemCount: number;
status: string;
createdAt: Date;
updatedAt: Date;
}
// Event Handler (updates read model)
class OrderProjection {
constructor(private db: Database) {}
async on(event: OrderCreated): Promise<void> {
// Fetch customer name (could be cached)
const customer = await this.getCustomer(event.customerId);
// Insert into read model
await this.db.orderSummaries.insert({
orderId: event.orderId,
customerId: event.customerId,
customerName: customer.name,
totalAmount: event.totalAmount,
itemCount: event.items.length,
status: 'PENDING',
createdAt: event.timestamp,
updatedAt: event.timestamp
});
}
async on(event: OrderPaid): Promise<void> {
await this.db.orderSummaries.update(
{ orderId: event.orderId },
{
status: 'PAID',
updatedAt: event.timestamp
}
);
}
}
// Query API (reads from optimized model)
class OrderQueryService {
async getOrderSummary(orderId: string): Promise<OrderSummary> {
return await this.db.orderSummaries.findOne({ orderId });
}
async getCustomerOrders(customerId: string): Promise<OrderSummary[]> {
return await this.db.orderSummaries.find({ customerId });
}
}
Good Fit:
Avoid When:
Use Case: Work distribution, reliable delivery, load balancing.
Producer → Queue → Consumer 1
→ Consumer 2 (competes for messages)
→ Consumer N
Characteristics:
- One message consumed by one consumer
- Load balancing across consumers
- Guaranteed delivery
- Message ordering (within partition/queue)
Examples:
- RabbitMQ queues
- AWS SQS
- Azure Service Bus queues
RabbitMQ Example:
// Producer
const queue = 'order-processing';
channel.sendToQueue(
queue,
Buffer.from(JSON.stringify(orderEvent)),
{ persistent: true } // survive broker restart
);
// Consumer
channel.consume(queue, async (msg) => {
const event = JSON.parse(msg.content.toString());
try {
await processOrder(event);
channel.ack(msg); // acknowledge success
} catch (error) {
channel.nack(msg, false, true); // requeue on failure
}
});
Use Case: Broadcasting events to multiple interested services.
Publisher → Topic → Subscriber 1 (all messages)
→ Subscriber 2 (all messages)
→ Subscriber N (all messages)
Characteristics:
- One message received by all subscribers
- Decoupled publishers and subscribers
- Dynamic subscription
- Topic-based or content-based routing
Examples:
- Apache Kafka topics
- AWS SNS
- Google Cloud Pub/Sub
- Azure Service Bus topics
Kafka Example:
// Producer
await producer.send({
topic: 'orders',
messages: [
{
key: orderEvent.orderId, // partition key
value: JSON.stringify(orderEvent),
headers: {
'event-type': 'OrderCreated',
'correlation-id': correlationId
}
}
]
});
// Consumer Group (load balanced)
const consumer = kafka.consumer({ groupId: 'order-analytics' });
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
await updateAnalytics(event);
}
});
RabbitMQ:
Strengths:
- Rich routing (exchanges, bindings)
- Message acknowledgment and requeue
- Priority queues
- Dead letter exchanges
Best for:
- Task distribution
- Complex routing patterns
- Guaranteed delivery
- Lower throughput needs (<100K msg/sec)
Apache Kafka:
Strengths:
- High throughput (millions msg/sec)
- Event log persistence
- Replay capability
- Partition-based parallelism
Best for:
- Event streaming
- High-volume systems
- Event sourcing backend
- Log aggregation
AWS SQS/SNS:
Strengths:
- Fully managed
- Infinite scale
- Simple integration
- Pay per use
Best for:
- AWS-native architectures
- Variable load
- Simple pub/sub or queuing
- Minimal ops overhead
Manage data consistency across services using a sequence of local transactions coordinated by events or orchestration.
Central coordinator manages transaction flow.
// Saga Orchestrator
class OrderSaga {
async execute(createOrderCommand: CreateOrderCommand): Promise<void> {
const sagaId = generateId();
const state = new SagaState(sagaId);
try {
// Step 1: Create order
state.orderId = await this.orderService.createOrder(
createOrderCommand
);
state.mark('ORDER_CREATED');
// Step 2: Reserve inventory
await this.inventoryService.reserveInventory({
orderId: state.orderId,
items: createOrderCommand.items
});
state.mark('INVENTORY_RESERVED');
// Step 3: Process payment
await this.paymentService.processPayment({
orderId: state.orderId,
amount: createOrderCommand.totalAmount
});
state.mark('PAYMENT_PROCESSED');
// Step 4: Confirm order
await this.orderService.confirmOrder(state.orderId);
state.mark('COMPLETED');
} catch (error) {
// Compensate in reverse order
await this.compensate(state, error);
throw new SagaFailedException(sagaId, error);
}
}
private async compensate(state: SagaState, error: Error): Promise<void> {
if (state.has('PAYMENT_PROCESSED')) {
await this.paymentService.refundPayment(state.orderId);
}
if (state.has('INVENTORY_RESERVED')) {
await this.inventoryService.releaseInventory(state.orderId);
}
if (state.has('ORDER_CREATED')) {
await this.orderService.cancelOrder(state.orderId);
}
}
}
Benefits:
Drawbacks:
Services coordinate via events without central controller.
// Order Service
class OrderService {
async createOrder(command: CreateOrderCommand): Promise<void> {
const order = new Order(command);
await this.repository.save(order);
// Publish event
await this.eventBus.publish(new OrderCreated({
orderId: order.id,
customerId: order.customerId,
items: order.items,
totalAmount: order.totalAmount
}));
}
}
// Inventory Service (reacts to OrderCreated)
class InventoryService {
@EventHandler(OrderCreated)
async onOrderCreated(event: OrderCreated): Promise<void> {
try {
await this.reserveStock(event.items);
// Publish success event
await this.eventBus.publish(new InventoryReserved({
orderId: event.orderId,
items: event.items
}));
} catch (error) {
// Publish failure event (triggers compensation)
await this.eventBus.publish(new InventoryReservationFailed({
orderId: event.orderId,
reason: error.message
}));
}
}
// Compensation handler
@EventHandler(OrderCancelled)
async onOrderCancelled(event: OrderCancelled): Promise<void> {
await this.releaseStock(event.orderId);
}
}
// Payment Service (reacts to InventoryReserved)
class PaymentService {
@EventHandler(InventoryReserved)
async onInventoryReserved(event: InventoryReserved): Promise<void> {
try {
await this.processPayment(event.orderId);
await this.eventBus.publish(new PaymentProcessed({
orderId: event.orderId
}));
} catch (error) {
await this.eventBus.publish(new PaymentFailed({
orderId: event.orderId,
reason: error.message
}));
}
}
// Compensation
@EventHandler(OrderCancelled)
async onOrderCancelled(event: OrderCancelled): Promise<void> {
await this.refundPayment(event.orderId);
}
}
Event Flow:
Success Flow:
OrderCreated → InventoryReserved → PaymentProcessed → OrderConfirmed
Failure Flow (Payment fails):
OrderCreated → InventoryReserved → PaymentFailed → OrderCancelled
→ InventoryReleased (compensation)
Benefits:
Drawbacks:
1. Compensating Transactions:
Action: ReserveInventory
Compensation: ReleaseInventory
Action: ProcessPayment
Compensation: RefundPayment
Action: CreateShipment
Compensation: CancelShipment
2. Semantic Lock:
Mark resource as "pending" to prevent concurrent access:
- Order status: PENDING_PAYMENT
- Inventory: RESERVED (not available for other orders)
- Payment: AUTHORIZED (not captured yet)
3. Saga Log:
Persist saga state for recovery:
- Current step
- Completed steps
- Compensation state
- Allows restart after failure
Decentralized coordination through events.
Service A → Event 1 → Service B → Event 2 → Service C
↓
Service D
When to Use:
Example: User Registration
1. Auth Service: UserRegistered event
→ Email Service: sends welcome email
→ Analytics Service: tracks signup
→ CRM Service: creates contact
Each service reacts independently.
Centralized coordinator manages flow.
Orchestrator → Service A
→ Service B
→ Service C
Orchestrator controls sequence and dependencies.
When to Use:
Example: Order Processing
OrderOrchestrator:
1. Validate order
2. Reserve inventory (wait)
3. Process payment (wait)
4. Create shipment (wait)
5. Confirm order
Clear sequence, centralized control.
Combine both for complex systems:
High-level: Orchestration (order saga)
Step 1: Process Order (choreography within)
→ Validate
→ Price calculation
→ Tax calculation
Step 2: Fulfill Order (choreography within)
→ Pick items
→ Pack
→ Label
Problem: User makes change, immediately queries, sees stale data.
Solutions:
1. Synchronous Projection Update:
async createOrder(command: CreateOrderCommand): Promise<OrderSummary> {
// Write to event store
await this.eventStore.append(orderCreatedEvent);
// Immediately update read model (synchronously)
const summary = await this.projection.apply(orderCreatedEvent);
return summary; // User sees their change
}
2. Version-Based Consistency:
// Return version with write
const result = await createOrder(command);
// version: 5
// Query with minimum version
const order = await queryOrder(orderId, minVersion: 5);
// Wait until read model catches up to version 5
3. Client-Side Optimistic Update:
// Client immediately shows optimistic state
this.orders.push(newOrder);
// Background: wait for confirmation
await waitForEvent('OrderCreated', newOrder.id);
When eventual consistency fails, undo changes:
// Original action
await inventoryService.reserveStock(orderId, items);
// Later: payment fails, compensate
await inventoryService.releaseStock(orderId);
// Idempotent: safe to call multiple times
Last-Write-Wins (LWW):
if (event1.timestamp > event2.timestamp) {
apply(event1);
} else {
apply(event2);
}
Custom Business Logic:
// Merge inventory updates
const finalQuantity = Math.max(
update1.quantity,
update2.quantity
);
CRDTs (Conflict-free Replicated Data Types):
// Automatic conflict resolution
const counter = new PNCounter();
counter.increment(5); // replica 1
counter.increment(3); // replica 2
// Automatically merges to 8