Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.
This skill inherits all available tools. When active, it can use any tool Claude has access to.
Create and run durable workflows with steps, streaming, and agent execution. Covers starting, resuming, and persisting workflow results.
See:
using-workflows in Fullstack RecipesDefine workflows with the "use workflow" directive:
// src/workflows/chat/index.ts
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const history = await getMessageHistory(chatId);
const { parts } = await agent.run(history, {
writable: getWritable(),
});
await persistMessageParts({ chatId, parts });
}
Use the start function from workflow/api:
import { start } from "workflow/api";
import { chatWorkflow } from "@/workflows/chat";
const run = await start(chatWorkflow, [{ chatId, userMessage }]);
// run.runId - unique identifier for this run
// run.readable - stream of UI message chunks
Use getRun to reconnect to an in-progress or completed workflow:
import { getRun } from "workflow/api";
const run = await getRun(runId);
const readable = await run.getReadable({ startIndex });
Steps are durable checkpoints that persist their results:
async function getMessageHistory(chatId: string) {
"use step";
return db.query.messages.findMany({
where: eq(messages.chatId, chatId),
});
}
Use getWritable() to stream data to clients:
import { getWritable } from "workflow";
export async function chatWorkflow({ chatId }) {
"use workflow";
const writable = getWritable();
// Pass to agent for streaming
await agent.run(history, { writable });
}
Access the current run's metadata:
import { getWorkflowMetadata } from "workflow";
export async function chatWorkflow({ chatId }) {
"use workflow";
const { workflowRunId } = getWorkflowMetadata();
// Store runId for resumption
await saveRunId(chatId, workflowRunId);
}
The workflow runtime doesn't support Node.js modules. Wrap logger calls in steps:
// src/workflows/chat/steps/logger.ts
import { logger } from "@/lib/common/logger";
export async function log(
level: "info" | "warn" | "error",
message: string,
data?: Record<string, unknown>,
): Promise<void> {
"use step";
if (data) {
logger[level](data, message);
} else {
logger[level](message);
}
}
Use the custom Agent class for full streaming control:
import { getWritable } from "workflow";
import { researchAgent } from "@/lib/ai/research";
export async function chatWorkflow({ chatId, userMessage }) {
"use workflow";
const history = await getMessageHistory(chatId);
const { parts } = await researchAgent.run(history, {
maxSteps: 10,
writable: getWritable(),
});
await persistMessageParts({ chatId, parts });
}
Save agent output to the database:
async function persistMessageParts({ chatId, parts }) {
"use step";
await db.insert(messages).values({
chatId,
role: "assistant",
parts: JSON.stringify(parts),
});
}