Agent-to-Agent (A2A) executor implementation patterns for task handling, execution management, and agent coordination. Use when building A2A executors, implementing task handlers, creating agent execution flows, or when user mentions A2A protocol, task execution, agent executors, task handlers, or agent coordination.
This skill is limited to using the following tools:
examples/function-executor.tsexamples/llm-executor.tsexamples/validation-executor.tsexamples/workflow-executor.tsscripts/test-executor.shscripts/validate-executor.shtemplates/async-executor.pytemplates/async-executor.tstemplates/basic-executor.pytemplates/basic-executor.tstemplates/batch-executor.tstemplates/streaming-executor.pytemplates/streaming-executor.tsPurpose: Provide production-ready executor patterns for implementing Agent-to-Agent (A2A) protocol task handlers with proper error handling, retry logic, and execution flows.
Activation Triggers:
Key Resources:
templates/basic-executor.ts - Simple synchronous executortemplates/basic-executor.py - Python synchronous executortemplates/async-executor.ts - Asynchronous task executortemplates/async-executor.py - Python async executortemplates/streaming-executor.ts - Streaming result executortemplates/streaming-executor.py - Python streaming executortemplates/batch-executor.ts - Batch task processingscripts/validate-executor.sh - Validate executor implementationscripts/test-executor.sh - Test executor against A2A specexamples/ - Production executor implementationsWhen to use: Simple, fast tasks with immediate results
Template: templates/basic-executor.ts or templates/basic-executor.py
Pattern:
async function executeTask(task: A2ATask): Promise<A2AResult> {
// 1. Validate input
validateTask(task)
// 2. Execute task
const result = await processTask(task)
// 3. Return result
return {
status: 'completed',
result,
taskId: task.id
}
}
Best for: Quick operations, validation tasks, simple transformations
When to use: Tasks that take time and need status updates
Template: templates/async-executor.ts or templates/async-executor.py
Pattern:
Best for: LLM inference, file processing, data analysis
When to use: Results should be delivered incrementally
Template: templates/streaming-executor.ts or templates/streaming-executor.py
Pattern:
Best for: Text generation, real-time data, progressive results
When to use: Processing multiple related tasks efficiently
Template: templates/batch-executor.ts
Pattern:
Best for: Bulk operations, parallel processing, resource optimization
function validateTask(task: A2ATask): void {
// Validate required fields
if (!task.id) throw new ValidationError('Task ID required')
if (!task.type) throw new ValidationError('Task type required')
// Validate task parameters
validateParameters(task.parameters)
// Check executor capabilities
if (!supportsTaskType(task.type)) {
throw new UnsupportedTaskError(task.type)
}
}
Purpose: Catch errors early, provide clear feedback
async function executeWithErrorHandling(task: A2ATask) {
try {
return await executeTask(task)
} catch (error) {
if (error instanceof ValidationError) {
return { status: 'failed', error: error.message }
}
if (error instanceof RetryableError) {
return scheduleRetry(task)
}
// Log and return generic error
logger.error('Task execution failed', { taskId: task.id, error })
return { status: 'failed', error: 'Internal error' }
}
}
Error Types:
ValidationError - Invalid input, don't retryRetryableError - Temporary failure, safe to retryFatalError - Permanent failure, abortconst retryConfig = {
maxAttempts: 3,
backoff: 'exponential', // or 'linear', 'fixed'
initialDelay: 1000, // ms
maxDelay: 30000
}
async function executeWithRetry(
task: A2ATask,
attempt: number = 1
): Promise<A2AResult> {
try {
return await executeTask(task)
} catch (error) {
if (attempt >= retryConfig.maxAttempts) {
throw new MaxRetriesExceededError(task.id)
}
if (error instanceof RetryableError) {
const delay = calculateBackoff(attempt)
await sleep(delay)
return executeWithRetry(task, attempt + 1)
}
throw error
}
}
Retry Strategies:
delay = initialDelay * (2 ^ attempt)delay = initialDelay * attemptdelay = initialDelayinterface TaskState {
id: string
status: 'pending' | 'running' | 'completed' | 'failed'
result?: any
error?: string
startTime: Date
endTime?: Date
attempts: number
}
class TaskStore {
private tasks = new Map<string, TaskState>()
createTask(id: string): TaskState {
const state: TaskState = {
id,
status: 'pending',
startTime: new Date(),
attempts: 0
}
this.tasks.set(id, state)
return state
}
updateTask(id: string, update: Partial<TaskState>): void {
const state = this.tasks.get(id)
if (state) {
Object.assign(state, update)
}
}
getTask(id: string): TaskState | undefined {
return this.tasks.get(id)
}
}
function loggingMiddleware(
executor: Executor
): Executor {
return async (task) => {
logger.info('Task started', { taskId: task.id })
const start = Date.now()
try {
const result = await executor(task)
const duration = Date.now() - start
logger.info('Task completed', { taskId: task.id, duration })
return result
} catch (error) {
const duration = Date.now() - start
logger.error('Task failed', { taskId: task.id, duration, error })
throw error
}
}
}
function metricsMiddleware(
executor: Executor
): Executor {
return async (task) => {
metrics.increment('tasks.started', { type: task.type })
const start = Date.now()
try {
const result = await executor(task)
const duration = Date.now() - start
metrics.timing('tasks.duration', duration, { type: task.type })
metrics.increment('tasks.completed', { type: task.type })
return result
} catch (error) {
metrics.increment('tasks.failed', { type: task.type })
throw error
}
}
}
function rateLimitMiddleware(
executor: Executor,
limit: { requests: number, window: number }
): Executor {
const limiter = new RateLimiter(limit.requests, limit.window)
return async (task) => {
await limiter.acquire()
try {
return await executor(task)
} finally {
limiter.release()
}
}
}
async function executeWithTimeout(
task: A2ATask,
timeoutMs: number
): Promise<A2AResult> {
return Promise.race([
executeTask(task),
new Promise((_, reject) =>
setTimeout(() => reject(new TimeoutError()), timeoutMs)
)
])
}
async function executeWithCleanup(task: A2ATask) {
const resources = []
try {
const resource = await allocateResource()
resources.push(resource)
return await executeTask(task, resource)
} finally {
// Always cleanup, even on error
await Promise.all(
resources.map(r => r.cleanup())
)
}
}
class GracefulExecutor {
private activeTasks = new Set<string>()
private shuttingDown = false
async execute(task: A2ATask): Promise<A2AResult> {
if (this.shuttingDown) {
throw new Error('Executor is shutting down')
}
this.activeTasks.add(task.id)
try {
return await executeTask(task)
} finally {
this.activeTasks.delete(task.id)
}
}
async shutdown(): Promise<void> {
this.shuttingDown = true
// Wait for active tasks to complete
while (this.activeTasks.size > 0) {
await sleep(100)
}
}
}
Example: examples/llm-executor.ts
Executes LLM inference tasks with streaming
Example: examples/function-executor.ts
Calls functions/tools and returns results
Example: examples/workflow-executor.ts
Orchestrates multi-step workflows
Example: examples/validation-executor.ts
Validates data and returns compliance results
Scripts:
scripts/validate-executor.sh - Validate executor structurescripts/test-executor.sh - Test against A2A specRun validation:
bash scripts/validate-executor.sh your-executor.ts
Run tests:
bash scripts/test-executor.sh your-executor.ts
TypeScript Templates:
basic-executor.ts - Simple sync executorasync-executor.ts - Async with status trackingstreaming-executor.ts - Streaming resultsbatch-executor.ts - Batch processingPython Templates:
basic-executor.py - Simple sync executorasync-executor.py - Async with status trackingstreaming-executor.py - Streaming resultsScripts:
validate-executor.sh - Structure validationtest-executor.sh - A2A spec complianceExamples:
llm-executor.ts - LLM inference executorfunction-executor.ts - Function calling executorworkflow-executor.ts - Multi-step workflowsvalidation-executor.ts - Data validationProtocol Version: A2A Protocol v1.0 Runtime: Node.js 18+, Python 3.9+
Best Practice: Start with basic executor, add complexity (async, streaming, batching) only as needed