Use when Effect concurrency patterns including fibers, fork, join, parallel execution, and race conditions. Use for concurrent operations in Effect applications.
Limited to specific tools
Additional assets for this skill
This skill is limited to using the following tools:
name: effect-concurrency description: Use when Effect concurrency patterns including fibers, fork, join, parallel execution, and race conditions. Use for concurrent operations in Effect applications. allowed-tools:
Master concurrent execution in Effect using fibers. This skill covers forking, joining, interruption, parallel execution, and advanced concurrency patterns for building high-performance Effect applications.
Fibers are lightweight virtual threads that execute effects concurrently:
import { Effect, Fiber } from "effect"
// Every effect runs on a fiber
const effect = Effect.succeed(42)
// When run, this executes on a fiber
// Effects are descriptions - fibers are executions
// Effect: lazy, immutable description
// Fiber: running execution with state
Create independent concurrent fibers:
import { Effect, Fiber } from "effect"
const task = Effect.gen(function* () {
yield* Effect.sleep("1 second")
yield* Effect.log("Task completed")
return 42
})
const program = Effect.gen(function* () {
// Fork creates a new fiber
const fiber = yield* Effect.fork(task)
// fiber: RuntimeFiber<number, never>
yield* Effect.log("Main fiber continues")
// Join waits for fiber to complete
const result = yield* Fiber.join(fiber)
yield* Effect.log(`Result: ${result}`)
return result
})
import { Effect, Fiber } from "effect"
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask)
// Join - wait for result
const result = yield* Fiber.join(fiber)
// Await - get Exit value (success/failure/interruption)
const exit = yield* Fiber.await(fiber)
// Interrupt - cancel execution
yield* Fiber.interrupt(fiber)
// Poll - check if complete (non-blocking)
const status = yield* Fiber.poll(fiber)
})
import { Effect } from "effect"
// Parallel execution (default)
const program = Effect.gen(function* () {
const results = yield* Effect.all([
fetchUser("1"),
fetchUser("2"),
fetchUser("3")
])
// All requests run concurrently
return results
})
// Sequential execution
const sequential = Effect.gen(function* () {
const results = yield* Effect.all([
fetchUser("1"),
fetchUser("2"),
fetchUser("3")
], { concurrency: 1 })
return results
})
// Limited concurrency
const limited = Effect.gen(function* () {
const results = yield* Effect.all(
Array.from({ length: 100 }, (_, i) => fetchUser(`${i}`)),
{ concurrency: 10 } // Max 10 concurrent
)
return results
})
import { Effect } from "effect"
// Batching for efficiency
const batchFetch = Effect.gen(function* () {
const userIds = Array.from({ length: 1000 }, (_, i) => `${i}`)
const results = yield* Effect.all(
userIds.map(id => fetchUser(id)),
{
concurrency: 50, // 50 concurrent requests
batching: true // Enable batching optimization
}
)
return results
})
import { Effect } from "effect"
const processUsers = (userIds: string[]) =>
Effect.forEach(
userIds,
(id) => Effect.gen(function* () {
const user = yield* fetchUser(id)
const processed = yield* processUser(user)
return processed
}),
{ concurrency: "unbounded" } // No limit
)
// With concurrency limit
const processUsersLimited = (userIds: string[]) =>
Effect.forEach(
userIds,
(id) => processUser(id),
{ concurrency: 10 }
)
import { Effect } from "effect"
const fetchWithFallback = (id: string) =>
Effect.race(
fetchFromPrimaryDb(id),
fetchFromSecondaryDb(id)
)
// Returns whichever completes first
// Racing multiple effects
const fastestSource = Effect.race(
fetchFromSource1(),
fetchFromSource2(),
fetchFromSource3()
)
import { Effect } from "effect"
const sources = [
fetchFromSource1(),
fetchFromSource2(),
fetchFromSource3()
]
// First to succeed wins
const fastest = Effect.raceAll(sources)
import { Effect } from "effect"
const withTimeout = <A, E, R>(
effect: Effect.Effect<A, E, R>,
duration: Duration.Duration
) =>
Effect.race(
effect,
Effect.sleep(duration).pipe(
Effect.andThen(Effect.fail({ _tag: "Timeout" }))
)
)
const program = Effect.gen(function* () {
const result = yield* withTimeout(
slowOperation(),
Duration.seconds(5)
)
return result
})
import { Effect, Fiber } from "effect"
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask)
// Cancel after 1 second
yield* Effect.sleep("1 second")
yield* Fiber.interrupt(fiber)
yield* Effect.log("Task cancelled")
})
// Automatic interruption on parent exit
const autoInterrupt = Effect.gen(function* () {
const fiber = yield* Effect.fork(infiniteLoop)
// fiber will be interrupted when this effect completes
})
import { Effect } from "effect"
const criticalSection = Effect.gen(function* () {
// This region cannot be interrupted
yield* Effect.uninterruptible(
Effect.gen(function* () {
yield* beginTransaction()
yield* updateDatabase()
yield* commitTransaction()
})
)
})
// Interruptible regions within uninterruptible
const mixed = Effect.uninterruptible(
Effect.gen(function* () {
yield* criticalOperation1()
// Allow interruption here
yield* Effect.interruptible(
nonCriticalOperation()
)
yield* criticalOperation2()
})
)
import { Effect } from "effect"
const program = Effect.gen(function* () {
// Regular fork - interrupted when parent exits
const regularFiber = yield* Effect.fork(task)
// Daemon fork - survives parent exit
const daemonFiber = yield* Effect.forkDaemon(backgroundTask)
// Parent exits, regularFiber interrupted, daemonFiber continues
})
// Background worker example
const startBackgroundWorker = Effect.gen(function* () {
yield* Effect.forkDaemon(
Effect.gen(function* () {
while (true) {
yield* processQueue()
yield* Effect.sleep("1 second")
}
})
)
})
import { Effect, Scope } from "effect"
const program = Effect.gen(function* () {
yield* Effect.scoped(
Effect.gen(function* () {
// Fibers are tied to scope
const fiber1 = yield* Effect.forkScoped(task1)
const fiber2 = yield* Effect.forkScoped(task2)
// Do work
yield* doWork()
// Scope exit automatically interrupts fibers
})
)
// fiber1 and fiber2 are interrupted here
})
import { Effect } from "effect"
const managedConcurrency = Effect.gen(function* () {
const scope = yield* Scope.make()
// Fork in specific scope
const fiber = yield* Effect.forkIn(task, scope)
// Work continues
yield* doWork()
// Close scope, interrupt fiber
yield* Scope.close(scope, Exit.succeed(undefined))
})
import { Effect, Queue } from "effect"
interface Task {
id: string
data: unknown
}
const createWorkerPool = (workers: number) =>
Effect.gen(function* () {
const queue = yield* Queue.bounded<Task>(100)
// Start workers
const workerFibers = yield* Effect.all(
Array.from({ length: workers }, () =>
Effect.fork(
Effect.forever(
Effect.gen(function* () {
const task = yield* Queue.take(queue)
yield* processTask(task)
})
)
)
)
)
return {
submit: (task: Task) => Queue.offer(queue, task),
shutdown: () =>
Effect.all(
workerFibers.map(fiber => Fiber.interrupt(fiber))
)
}
})
import { Effect, Chunk } from "effect"
const parallelMapReduce = <A, B, E, R>(
items: A[],
map: (item: A) => Effect.Effect<B, E, R>,
reduce: (acc: B, item: B) => B,
initial: B,
concurrency: number
) =>
Effect.gen(function* () {
const mapped = yield* Effect.forEach(
items,
map,
{ concurrency }
)
return mapped.reduce(reduce, initial)
})
import { Effect, Request, RequestResolver } from "effect"
interface GetUser extends Request.Request<User, UserNotFound> {
readonly _tag: "GetUser"
readonly id: string
}
const GetUserResolver = RequestResolver.makeBatched(
(requests: GetUser[]) =>
Effect.gen(function* () {
const ids = requests.map(r => r.id)
const users = yield* fetchUsersBatch(ids)
// Resolve all requests
return Effect.forEach(requests, (request) => {
const user = users.find(u => u.id === request.id)
return user
? Request.complete(request, user)
: Request.fail(request, { _tag: "UserNotFound", id: request.id })
})
})
)
// Multiple concurrent requests for same ID deduplicated
const program = Effect.gen(function* () {
const results = yield* Effect.all([
Effect.request(GetUser({ id: "1" }), GetUserResolver),
Effect.request(GetUser({ id: "1" }), GetUserResolver),
Effect.request(GetUser({ id: "1" }), GetUserResolver)
])
// Only one actual fetch for ID "1"
})
Use Effect.all for Parallel Work: Don't fork manually when Effect.all suffices.
Limit Concurrency: Set appropriate concurrency limits to avoid resource exhaustion.
Handle Interruption: Ensure cleanup code runs in uninterruptible regions.
Use Scoped Forks: Tie fiber lifetime to scopes for automatic cleanup.
Avoid Infinite Loops: Use Effect.forever with sleep for background tasks.
Batch Requests: Use request resolvers to batch and deduplicate.
Timeout Long Operations: Add timeouts to prevent hanging.
Monitor Fiber Status: Use Fiber.await and Fiber.poll for status checks.
Use Daemon Sparingly: Only fork daemons when truly independent.
Test Concurrent Code: Write tests for race conditions and interruption.
Forgetting to Join: Forking without joining loses results.
No Concurrency Limits: Unbounded concurrency can exhaust resources.
Not Handling Interruption: Missing cleanup in interruptible regions.
Race Conditions: Sharing mutable state between fibers.
Deadlocks: Circular dependencies between fibers.
Ignoring Failures: Not checking fiber exit status.
Memory Leaks: Daemon fibers that never terminate.
Over-Forking: Creating too many fibers unnecessarily.
Missing Timeouts: Long-running operations without limits.
Wrong Execution Mode: Using sequential when parallel is intended.
Use effect-concurrency when you need to: