Use when implementing concurrent programming in Crystal using fibers, channels, and parallel execution patterns for high-performance, non-blocking applications.
Limited to specific tools
Additional assets for this skill
This skill is limited to using the following tools:
You are Claude Code, an expert in Crystal's concurrency model. You specialize in building high-performance, concurrent applications using fibers, channels, and Crystal's lightweight concurrency primitives.
Your core responsibilities:
Crystal uses fibers (also known as green threads or coroutines) for concurrency. Fibers are cooperatively scheduled by the Crystal runtime and are much lighter weight than OS threads.
# Simple fiber spawning
spawn do
puts "Running in a fiber"
sleep 1
puts "Fiber completed"
end
# Fiber with arguments
def process_data(id : Int32, data : String)
puts "Processing #{data} with id #{id}"
sleep 0.5
puts "Completed #{id}"
end
spawn process_data(1, "task A")
spawn process_data(2, "task B")
# Wait for fibers to complete
sleep 1
# Fibers don't return values directly, use channels instead
result_channel = Channel(Int32).new
spawn do
result = expensive_computation(42)
result_channel.send(result)
end
# Do other work...
puts "Doing other work"
# Wait for result
result = result_channel.receive
puts "Got result: #{result}"
def expensive_computation(n : Int32) : Int32
sleep 1
n * 2
end
# Give fibers descriptive names for debugging
spawn(name: "data-processor") do
process_large_dataset
end
spawn(name: "cache-updater") do
update_cache_periodically
end
# Fiber names appear in exception backtraces
spawn(name: "failing-worker") do
raise "Something went wrong"
end
Channels are the primary mechanism for communication between fibers. They provide thread-safe message passing with optional buffering.
# Unbuffered channel - blocks until both sender and receiver are ready
channel = Channel(String).new
spawn do
puts "Sending message"
channel.send("Hello")
puts "Message sent"
end
spawn do
sleep 0.1 # Small delay
puts "Receiving message"
msg = channel.receive
puts "Received: #{msg}"
end
sleep 1
# Buffered channel - allows sending without blocking up to buffer size
channel = Channel(Int32).new(capacity: 3)
# These sends won't block
channel.send(1)
channel.send(2)
channel.send(3)
# This would block until someone receives
# channel.send(4)
# Receive values
puts channel.receive # 1
puts channel.receive # 2
puts channel.receive # 3
# Producer-consumer with channel closing
channel = Channel(Int32).new
# Producer
spawn do
5.times do |i|
channel.send(i)
sleep 0.1
end
channel.close # Signal no more values
end
# Consumer - iterate until channel is closed
spawn do
channel.each do |value|
puts "Received: #{value}"
end
puts "Channel closed, consumer exiting"
end
sleep 1
channel = Channel(String).new
spawn do
channel.send("message 1")
channel.send("message 2")
channel.close
end
sleep 0.1
# Check before receiving
unless channel.closed?
puts channel.receive
end
# Or handle the exception
begin
puts channel.receive
puts channel.receive
puts channel.receive # Will raise Channel::ClosedError
rescue Channel::ClosedError
puts "Channel is closed"
end
The select statement allows waiting on multiple channel operations simultaneously,
similar to Go's select statement.
ch1 = Channel(String).new
ch2 = Channel(Int32).new
spawn do
sleep 0.2
ch1.send("from channel 1")
end
spawn do
sleep 0.1
ch2.send(42)
end
# Wait for whichever channel is ready first
select
when msg = ch1.receive
puts "Got string: #{msg}"
when num = ch2.receive
puts "Got number: #{num}"
end
sleep 1
channel = Channel(String).new
spawn do
sleep 2 # Takes too long
channel.send("delayed message")
end
# Wait with timeout
select
when msg = channel.receive
puts "Received: #{msg}"
when timeout(1.second)
puts "Timed out waiting for message"
end
channel = Channel(Int32).new
# Non-blocking receive
select
when value = channel.receive
puts "Got value: #{value}"
else
puts "No value available, continuing immediately"
end
results = Channel(String).new
done = Channel(Nil).new
output = [] of String
# Multiple workers sending results
3.times do |i|
spawn do
sleep rand(0.5..1.5)
results.send("Worker #{i} done")
end
end
# Collector fiber
spawn do
3.times do
output << results.receive
end
done.send(nil)
end
# Wait for completion with timeout
select
when done.receive
puts "All workers completed"
output.each { |msg| puts msg }
when timeout(5.seconds)
puts "Timeout - not all workers completed"
end
Worker pools distribute tasks across a fixed number of concurrent workers.
class WorkerPool(T, R)
def initialize(@size : Int32)
@tasks = Channel(T).new
@results = Channel(R).new
@workers = [] of Fiber
@size.times do |i|
@workers << spawn(name: "worker-#{i}") do
worker_loop
end
end
end
private def worker_loop
@tasks.each do |task|
result = process(task)
@results.send(result)
end
end
def process(task : T) : R
# Override in subclass or pass block
raise "Not implemented"
end
def submit(task : T)
@tasks.send(task)
end
def get_result : R
@results.receive
end
def shutdown
@tasks.close
end
end
# Usage example
class IntSquarePool < WorkerPool(Int32, Int32)
def process(task : Int32) : Int32
sleep 0.1 # Simulate work
task * task
end
end
pool = IntSquarePool.new(size: 3)
# Submit tasks
10.times { |i| pool.submit(i) }
# Collect results
results = [] of Int32
10.times { results << pool.get_result }
pool.shutdown
puts results.sort
struct Task
property id : Int32
property data : String
def initialize(@id, @data)
end
end
struct Result
property task_id : Int32
property success : Bool
property value : String?
property error : String?
def initialize(@task_id, @success, @value = nil, @error = nil)
end
end
class RobustWorkerPool
def initialize(@worker_count : Int32)
@tasks = Channel(Task).new(capacity: 100)
@results = Channel(Result).new(capacity: 100)
@worker_count.times do |i|
spawn(name: "worker-#{i}") do
process_tasks
end
end
end
private def process_tasks
@tasks.each do |task|
begin
result_value = process_task(task)
@results.send(Result.new(
task_id: task.id,
success: true,
value: result_value
))
rescue ex
@results.send(Result.new(
task_id: task.id,
success: false,
error: ex.message
))
end
end
end
private def process_task(task : Task) : String
# Simulate processing that might fail
raise "Invalid data" if task.data.empty?
sleep 0.1
"Processed: #{task.data}"
end
def submit(task : Task)
@tasks.send(task)
end
def get_result : Result
@results.receive
end
def shutdown
@tasks.close
end
end
Implement parallel processing of collections.
def parallel_map(collection : Array(T), workers : Int32 = 4, &block : T -> R) : Array(R) forall T, R
tasks = Channel(Tuple(Int32, T)).new
results = Channel(Tuple(Int32, R)).new
# Spawn workers
workers.times do
spawn do
tasks.each do |index, item|
result = yield item
results.send({index, result})
end
end
end
# Send tasks
spawn do
collection.each_with_index do |item, index|
tasks.send({index, item})
end
tasks.close
end
# Collect results in order
result_map = {} of Int32 => R
collection.size.times do
index, result = results.receive
result_map[index] = result
end
collection.indices.map { |i| result_map[i] }
end
# Usage
numbers = (1..100).to_a
squares = parallel_map(numbers, workers: 8) do |n|
sleep 0.01 # Simulate work
n * n
end
puts squares.first(10)
def parallel_reduce(collection : Array(T), workers : Int32 = 4, initial : R, &block : R, T -> R) : R forall T, R
chunk_size = (collection.size / workers.to_f).ceil.to_i
chunks = collection.each_slice(chunk_size).to_a
results = Channel(R).new
chunks.each do |chunk|
spawn do
chunk_result = chunk.reduce(initial) { |acc, item| yield acc, item }
results.send(chunk_result)
end
end
# Reduce the partial results
final_result = initial
chunks.size.times do
final_result = yield final_result, results.receive
end
final_result
end
# Usage - sum of squares
numbers = (1..1000).to_a
sum = parallel_reduce(numbers, initial: 0) do |acc, n|
acc + n * n
end
puts "Sum of squares: #{sum}"
When fibers need to share mutable state, use mutexes to prevent race conditions.
require "mutex"
class Counter
def initialize
@count = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize do
current = @count
sleep 0.001 # Simulate some work
@count = current + 1
end
end
def value : Int32
@mutex.synchronize { @count }
end
end
counter = Counter.new
# Spawn 100 fibers that each increment 10 times
100.times do
spawn do
10.times { counter.increment }
end
end
sleep 2
puts "Final count: #{counter.value}" # Should be 1000
require "mutex"
class CachedData
def initialize
@data = {} of String => String
@mutex = Mutex.new
@version = 0
end
def read(key : String) : String?
@mutex.synchronize do
@data[key]?
end
end
def write(key : String, value : String)
@mutex.synchronize do
@data[key] = value
@version += 1
end
end
def batch_update(updates : Hash(String, String))
@mutex.synchronize do
updates.each do |key, value|
@data[key] = value
end
@version += 1
end
end
def snapshot : Hash(String, String)
@mutex.synchronize do
@data.dup
end
end
end
For simple counters and flags, atomic operations are more efficient than mutexes.
require "atomic"
class AtomicCounter
def initialize(initial : Int32 = 0)
@count = Atomic(Int32).new(initial)
end
def increment : Int32
@count.add(1)
end
def decrement : Int32
@count.sub(1)
end
def value : Int32
@count.get
end
def compare_and_set(expected : Int32, new_value : Int32) : Bool
@count.compare_and_set(expected, new_value)
end
end
counter = AtomicCounter.new
# Safe concurrent increments without mutex
1000.times do
spawn { counter.increment }
end
sleep 1
puts "Count: #{counter.value}"
require "atomic"
class ShutdownCoordinator
def initialize
@shutdown_flag = Atomic(Int32).new(0)
end
def shutdown!
@shutdown_flag.set(1)
end
def shutdown? : Bool
@shutdown_flag.get == 1
end
def run_until_shutdown(&block)
until shutdown?
yield
sleep 0.1
end
end
end
coordinator = ShutdownCoordinator.new
# Worker that checks shutdown flag
spawn(name: "worker") do
coordinator.run_until_shutdown do
puts "Working..."
end
puts "Worker shutdown gracefully"
end
sleep 1
coordinator.shutdown!
sleep 0.5
Use the crystal-concurrency skill when you need to:
Channel::ClosedError or check closed? before operationsselect and timeout() to prevent infinite blockingMutex or atomics when sharing mutable state between fibersensure blocks to guarantee resource cleanup even on errorsselect ... else for polling patternssynchronize: Forgetting to wrap mutex usage in synchronize block causes race conditions