From acp
This skill should be used when implementing an ACP agent or extending the lab ACP runtime in Rust — including using Client.builder()/ByteStreams/attach_session to connect to ACP providers, calling session_config_options() to discover models and config options, switching models via SetSessionConfigOptionRequest, implementing the Agent trait for a new ACP provider (stdio agent), handling session/prompt or session/update wire messages, or debugging JSON-RPC 2.0 stdio transport issues. Also applies when working on crates/lab/src/acp/runtime.rs, the codex-acp reference implementation, or authoring bidirectional stdio agents for Zed or VS Code.
How this skill is triggered — by the user, by Claude, or both
Slash command
/acp:rustThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
ACP is a JSON-RPC 2.0 protocol for bidirectional communication between AI coding agents and editor clients (Zed, VS Code, etc.). Agents run as subprocesses — clients write to stdin, read from stdout. stderr is for logs only, never protocol data.
ACP is a JSON-RPC 2.0 protocol for bidirectional communication between AI coding agents and editor clients (Zed, VS Code, etc.). Agents run as subprocesses — clients write to stdin, read from stdout. stderr is for logs only, never protocol data.
Lab version pin: agent-client-protocol = { version = "=0.13.1", features = ["unstable"] } in crates/lab/Cargo.toml. When upgrading, pin to an exact version, verify the unstable feature still compiles, and re-check session_config_options() behavior against the new SessionConfigOption / SessionConfigKind::Select API.
Two roles in this codebase:
Client.builder() + ByteStreams + attach_session. See "Lab ACP Runtime" below.Agent trait and run on stdio. Lab spawns them as subprocesses. See "Implementing an Agent" below.SDK source: ~/workspace/acp/rust-sdk/ — canonical trait signatures
Production reference: ~/workspace/acp/codex-acp/ (Rust agent for OpenAI/Codex)
Schema types: ~/workspace/acp/agent-client-protocol/ — schema crate only (InitializeRequest, AuthMethod, etc.). Does not contain Agent/Client traits or the runtime layer.
Lab binary (pinned, unstable features — use this in crates/lab):
agent-client-protocol = { version = "=0.13.1", features = ["unstable"] }
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["compat"] } # required: .compat() / .compat_write() bridge
Standalone ACP agent (new provider binary):
agent-client-protocol = "0" # types + transport (AgentSideConnection, Agent trait)
async-trait = "0.1" # required: Agent trait needs #[async_trait(?Send)]
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["compat"] }
futures = "0.3" # AsyncRead/AsyncWrite traits expected by AgentSideConnection
anyhow = "1"
uuid = { version = "1", features = ["v4"] }
dashmap = "5" # preferred over std::sync::Mutex<HashMap> in async contexts
initialize → authenticate → session/new → session/prompt (streaming) → session/cancel
All streaming happens via session/update notifications sent from agent to client during prompt execution. The final PromptResponse matches the original session/prompt request id.
See references/wire-format.md for full JSON examples of every message.
The lab runtime (crates/lab/src/acp/runtime.rs) uses the builder API. Lab is the client; ACP providers (codex-acp etc.) are agents spawned as subprocesses.
use agent_client_protocol::{Agent, ByteStreams, Client, ConnectionTo, on_receive_request};
use agent_client_protocol::schema::{
InitializeRequest, NewSessionRequest, ProtocolVersion, Implementation,
SetSessionConfigOptionRequest, SessionConfigOption, SessionConfigKind, SessionConfigSelectOptions,
};
// Wrap subprocess stdio in ByteStreams — requires .compat_write() / .compat() bridges.
let transport = ByteStreams::new(stdin.compat_write(), stdout.compat());
// Client.builder() registers inbound request handlers, then connect_with drives the session.
// on_receive_request!() is the required second argument — a macro-generated registration handle.
Client
.builder()
.on_receive_request(
async move |args: RequestPermissionRequest, responder, _cx| {
responder.respond(handle_permission(args).await)
},
on_receive_request!(),
)
.connect_with(transport, move |connection: ConnectionTo<Agent>| async move {
// 1. Initialize — protocol-level, use send_request (NOT send_request_to)
let initialized = connection
.send_request(
InitializeRequest::new(ProtocolVersion::V1)
.client_info(Implementation::new("lab-acp-bridge", env!("CARGO_PKG_VERSION")))
.client_capabilities(lab_client_capabilities()),
)
.block_task()
.await?;
// 2. Create session — use send_request_to(Agent, ...) NOT send_request(...)
// NewSessionRequest::new(&*cwd) — cwd: String, deref to str
let new_session_response = connection
.send_request_to(Agent, NewSessionRequest::new(&*cwd))
.block_task()
.await?;
// 3. Read config options BEFORE attach_session — it consumes the response
let (model_id, models) = session_config_options(
new_session_response.config_options.as_deref().unwrap_or_default(),
);
// 4. Attach session — produces the session handle for read_update() and further requests
let mut session = connection
.attach_session(new_session_response, vec![])
.map_err(|e| acp_internal_error(e.to_string()))?;
// session.session_id() — provider session ID
// session.read_update() — await next SessionMessage from provider
// session.connection() — get connection back for further send_request_to() calls
Ok::<(), agent_client_protocol::Error>(())
})
.await;
GOTCHA —
send_request_to(Agent, ...)vssend_request(...): Session-scoped requests (NewSessionRequest,SetSessionConfigOptionRequest,PromptRequest) must use.send_request_to(Agent, req). Plain.send_request(req)is for protocol-level messages only (InitializeRequest).
GOTCHA — read config_options before attach_session:
attach_sessionconsumes theNewSessionResponse. Always extractconfig_optionsfrom it first.
fn session_config_options(raw: &[SessionConfigOption]) -> (Option<String>, Vec<ModelOption>) {
let mut model_id = None;
let mut models = Vec::new();
for opt in raw {
let is_model = opt.category.as_ref() == Some(&SessionConfigOptionCategory::Model);
if let SessionConfigKind::Select(select) = &opt.kind {
let current = select.current_value.to_string();
let opts: Vec<ModelOption> = match &select.options {
SessionConfigSelectOptions::Ungrouped(options) => options.iter()
.map(|o| ModelOption { id: o.value.to_string(), name: o.name.clone() })
.collect(),
SessionConfigSelectOptions::Grouped(groups) => groups.iter()
.flat_map(|g| g.options.iter()
.map(|o| ModelOption { id: o.value.to_string(), name: o.name.clone() }))
.collect(),
_ => Vec::new(),
};
if is_model { model_id = Some(current); models = opts; }
}
}
(model_id, models)
}
Send SetSessionConfigOptionRequest before the next prompt turn:
session
.connection()
.send_request_to(
Agent,
SetSessionConfigOptionRequest::new(session.session_id().clone(), "model", model_id),
)
.block_task()
.await?;
// Start prompt asynchronously; StopReason arrives via on_receiving_result callback.
session
.connection()
.send_request_to(Agent, PromptRequest::new(session.session_id().clone(), content_blocks))
.on_receiving_result(async move |result| {
drop(prompt_response_tx.send(result.map(|r| r.stop_reason).map_err(|e| e.to_string())));
Ok(())
})
.map_err(|e| acp_internal_error(e.to_string()))?;
// biased select! ensures StopReason is preferred over a simultaneous idle timeout.
loop {
tokio::select! {
biased;
stop = &mut prompt_response_rx => { /* handle StopReason, break */ }
update = session.read_update() => { /* handle SessionMessage */ }
() = tokio::time::sleep(idle_timeout), if saw_assistant_output => { /* idle_completion, break */ }
}
}
| Variable | Default | Purpose |
|---|---|---|
LAB_ACP_PROMPT_IDLE_TIMEOUT_MS | 5000 | Idle timeout after last assistant chunk |
LAB_ACP_TURN_DRAIN_TIMEOUT_MS | 300000 | Max wait draining late StopReason after idle_completion |
LAB_ACP_PERMISSION_TIMEOUT_MS | 60000 | Permission request decision window |
Implement the Agent trait to build an ACP provider that runs on stdio:
// The Agent trait uses ?Send bounds. In the =0.13.x SDK, implement with
// #[async_trait::async_trait(?Send)] — native async fn in trait does NOT work here.
#[async_trait::async_trait(?Send)]
impl Agent for MyAgent {
async fn initialize(&self, req: InitializeRequest) -> acp::Result<InitializeResponse>;
async fn authenticate(&self, req: AuthenticateRequest) -> acp::Result<AuthenticateResponse>;
async fn new_session(&self, req: NewSessionRequest) -> acp::Result<NewSessionResponse>;
// prompt() takes ONLY PromptRequest — there is NO SessionNotifier parameter.
// Streaming updates are sent via conn.session_notification() from a background task.
// See "Streaming Notifications" section below for the required mpsc channel pattern.
async fn prompt(&self, req: PromptRequest) -> acp::Result<PromptResponse>;
// Method name is cancel (NOT on_cancel). Returns Result<()>.
async fn cancel(&self, notification: CancelNotification) -> acp::Result<()>;
// Optional methods (default: Err(Error::method_not_found())):
// load_session, set_session_mode, set_session_config_option, list_sessions
// UNSTABLE (behind feature flags): close_session, fork_session, resume_session, set_session_model
}
// Entry point — use current_thread flavor (?Send trait requires LocalSet).
// MUST use .compat() / .compat_write() — AgentSideConnection expects futures::AsyncRead/AsyncWrite,
// NOT tokio::io traits. These are different trait families.
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
let (notif_tx, mut notif_rx) = tokio::sync::mpsc::unbounded_channel::<NotifMsg>();
let agent = Arc::new(MyAgent { notif_tx, sessions: Arc::new(DashMap::new()) });
tokio::task::LocalSet::new().run_until(async move {
// conn implements Client — use it to call session_notification, request_permission, etc.
// io_task drives the stdio read/write loop.
let (conn, io_task) = AgentSideConnection::new(
agent,
tokio::io::stdout().compat_write(), // outgoing
tokio::io::stdin().compat(), // incoming
|fut| { tokio::task::spawn_local(fut); },
);
// Background task: receive (notification, done_tx) from agent, send via conn.
tokio::task::spawn_local(async move {
while let Some((notif, done_tx)) = notif_rx.recv().await {
if conn.session_notification(notif).await.is_err() { break; }
let _ = done_tx.send(());
}
});
io_task.await
}).await
}
GOTCHA — no SessionNotifier in prompt():
SessionNotifierdoes not exist in the SDK.prompt()receives onlyPromptRequest. Send streaming updates viaconn.session_notification()called from a background task. The agent communicates with the background task via an mpsc channel stored inself.
GOTCHA — will not compile without compat:
tokio::io::stdin()does NOT implementfutures::AsyncRead. Always use.compat()(read) and.compat_write()(write) fromtokio_util::compat. Without?SendandLocalSet, the runtime panics on!Sendtypes.
For a complete working skeleton see examples/agent-impl.rs.
Key points:
InitializeResponseProtocolVersion::V1 (not LATEST) in InitializeResponse::new()Err(acp::Error::auth_required()) explicitly on auth failure (maps to JSON-RPC -32000)tokio::io::stdin/stdout() with .compat() — never std::io in an async context (blocks executor)DashMap for session state, not std::sync::Mutex<HashMap> (deadlock risk under Tokio)#![deny(clippy::print_stdout, clippy::print_stderr)] — one stray println! corrupts the binary protocol streamThe prompt() method has no access to the connection. To stream updates during a prompt turn, use an mpsc channel:
// In the agent struct:
type NotifMsg = (SessionNotification, tokio::sync::oneshot::Sender<()>);
struct MyAgent {
notif_tx: tokio::sync::mpsc::UnboundedSender<NotifMsg>,
sessions: Arc<DashMap<String, SessionState>>,
}
// Helper method for sending updates from prompt():
async fn send_update(&self, session_id: &str, update: SessionUpdate) -> acp::Result<()> {
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
let notif = SessionNotification::new(session_id.to_string(), update);
self.notif_tx.send((notif, done_tx)).map_err(|_| acp::Error::internal_error())?;
done_rx.await.map_err(|_| acp::Error::internal_error())
}
// In prompt() — use self.send_update() to stream:
async fn prompt(&self, req: PromptRequest) -> acp::Result<PromptResponse> {
self.send_update(&req.session_id, SessionUpdate::AgentMessageChunk(
ContentChunk::new("Thinking...".into()) // .into() converts &str → ContentBlock
)).await?;
Ok(PromptResponse::new(StopReason::EndTurn))
}
// In main() — background task owns conn, drains the channel:
tokio::task::spawn_local(async move {
while let Some((notif, done_tx)) = notif_rx.recv().await {
if conn.session_notification(notif).await.is_err() { break; }
let _ = done_tx.send(());
}
});
GOTCHA — ContentChunk::new takes ContentBlock:
ContentChunk::new(content: ContentBlock)— NOT a bare&str. UseContentChunk::new("text".into())which works becauseFrom<T: Into<String>> for ContentBlockis implemented —"text".into()becomesContentBlock::Text(TextContent::new("text")).ContentChunk::new("text")is a compile error.
For generic client implementations not using the lab runtime's Client.builder() pattern:
// The Client trait also requires #[async_trait::async_trait(?Send)] in this SDK version.
#[async_trait::async_trait(?Send)]
impl Client for MyClient {
// REQUIRED: receives session/update notifications (streaming chunks, tool calls, etc.).
async fn session_notification(&self, args: SessionNotification) -> acp::Result<()>;
// REQUIRED: agent calls this before any destructive operation.
// Returns RequestPermissionResponse (wraps outcome), NOT RequestPermissionOutcome directly.
// Outcome: Cancelled | Selected(SelectedPermissionOutcome::new(option_id))
async fn request_permission(&self, args: RequestPermissionRequest) -> acp::Result<RequestPermissionResponse>;
// Optional (default: Err(method_not_found)) — only needed if you advertise fs capability:
async fn read_text_file(&self, args: ReadTextFileRequest) -> acp::Result<ReadTextFileResponse>;
async fn write_text_file(&self, args: WriteTextFileRequest) -> acp::Result<WriteTextFileResponse>;
// Optional terminal methods: create_terminal, terminal_output, release_terminal,
// wait_for_terminal_exit, kill_terminal
}
// Spawn agent subprocess and connect.
// Arg order: (client_handler, outgoing→agent_stdin, incoming←agent_stdout, spawner)
// conn implements Agent — call conn.initialize(), conn.prompt(), etc. to drive the session.
let (conn, io_task) = ClientSideConnection::new(
MyClient,
agent_stdin.compat_write(), // outgoing
agent_stdout.compat(), // incoming
|fut| { tokio::task::spawn_local(fut); },
);
// Drive session in a spawned task; await io_task to run until connection closes.
For a complete working skeleton see examples/client-impl.rs.
Send ToolCall before executing a tool, then ToolCallUpdate with the result. Use self.send_update() from the streaming pattern above.
// Before tool execution — builder pattern, no Default impl
self.send_update(&req.session_id, SessionUpdate::ToolCall(
ToolCall::new("tc-1", "Read src/main.rs")
.kind(ToolKind::Read)
.status(ToolCallStatus::InProgress)
.locations(vec![ToolCallLocation::new("src/main.rs")]),
)).await?;
// After tool execution — ToolCallUpdateFields builder, #[serde(flatten)] in wire format
self.send_update(&req.session_id, SessionUpdate::ToolCallUpdate(ToolCallUpdate::new(
"tc-1",
ToolCallUpdateFields::new()
.status(ToolCallStatus::Completed)
.content(vec![ToolCallContent::Content(Content::new(
ContentBlock::Text { text: result },
))]),
))).await?;
GOTCHA — no struct literals:
ToolCallandToolCallUpdatehave noDefaultimpl. Use the builder pattern —ToolCall::new(id, title).kind(...).status(...).ToolCallStatus::Starteddoes not exist; useInProgress. The enum isToolKind(notToolCallKind).
For all 10 ToolKind variants, JSON wire format, streaming deduplication, and _meta extensibility see references/tool-calls.md.
references/wire-format.md — Full JSON-RPC examples for every message type. Reach for this when debugging wire format mismatches or building a client from scratch.references/message-reference.md — Complete table of all 24 ACP methods, all 11 SessionUpdate variants, session modes, and error codes.references/tool-calls.md — Tool call kinds table, full JSON wire examples, streaming deduplication pattern, _meta extensibility, terminal tool lifecycle.references/codex-patterns.md — Production patterns extracted from codex-acp: OnceLock global client, SessionClient error-tolerant notification wrapper, DashMap session state, LocalSet + compat wiring, filesystem sandboxing, auth guard, graceful cancellation.references/unstable-features.md — All 9 unstable feature flags with Cargo.toml activation syntax and stability tracking.examples/agent-impl.rs — Complete Agent trait implementation skeleton with DashMap session state, mpsc notification channel, tool call notifications, and correct tokio::io usage.examples/client-impl.rs — Complete Client trait implementation skeleton with subprocess spawning, session_notification handler, file I/O handlers, and permission handling.Client.builder().on_receive_request(..., on_receive_request!()).connect_with(transport, ...) — not ClientSideConnectiontransport = ByteStreams::new(stdin.compat_write(), stdout.compat())send_request_to(Agent, NewSessionRequest::new(&*cwd)) — not send_request()config_options from NewSessionResponse before calling attach_session (which consumes it)session_config_options() to parse SessionConfigKind::Select into current model + available modelssend_request_to(Agent, SetSessionConfigOptionRequest::new(session_id, "model", model_id))biased select! in the prompt loop — prevents idle timeout from winning over a simultaneous StopReasonenv_clear() + explicit allowlist — never forward the full environmentprocess_group(0) on Unix — enables SIGTERM to the entire process group on shutdown#![deny(clippy::print_stdout, clippy::print_stderr)] in crate root — one stray println! corrupts the binary protocol streamasync-trait = "0.1" to Cargo.toml — Agent trait requires #[async_trait::async_trait(?Send)] in this SDK versionAgentSideConnection inside tokio::task::LocalSet — required for !Send types#[tokio::main(flavor = "current_thread")] — matches the ?Send trait requirementuse tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt} — call .compat() / .compat_write() on tokio IO types (they do NOT implement futures::AsyncRead/Write natively)AgentSideConnection::new returns (conn, io_task) — use conn for session_notification; don't discard itmpsc::UnboundedSender<NotifMsg> in the agent — this is how prompt() sends streaming updatesconn.session_notification()initialize — advertise only capabilities the agent actually supports; use ProtocolVersion::V1authenticate — validate credentials; return Err(acp::Error::auth_required()) on failurenew_session — generate UUID, store state in DashMap; req.cwd is PathBuf (not Option<PathBuf>)prompt — only takes PromptRequest (no SessionNotifier!); use send_update() helper for streamingcancel (not on_cancel) — store a watch::Sender<bool> in session state, signal it; race with biased tokio::select! in prompt loopcwd — reject ../ escapes using std::path::absolute()async-trait = "0.1" — Client trait requires #[async_trait::async_trait(?Send)]tokio::process::Command, pipe stdioClientSideConnection::new arg order: (client, outgoing→agent_stdin, incoming←agent_stdout, spawner)ClientSideConnection::new returns (conn, io_task) — use conn.initialize() etc. to drive the sessionsession_notification — required; route SessionUpdate variants to render in UIrequest_permission — required; return Cancelled or Selected(SelectedPermissionOutcome::new(option_id))read_text_file/write_text_file only if you advertise fs capability in InitializeRequestSessionUpdate variants (chunk, tool_call, tool_call_update, thought)session/cancel via conn.cancel(CancelNotification::new(session_id)) on user interruptkind to pick appropriate UI (diff, file path, terminal)Creates, edits, and optimizes skills for Claude Code, including drafting, evaluating with test prompts, iterating on performance, and improving skill descriptions for better triggering accuracy.
npx claudepluginhub jmagar/dendrite --plugin acp