From cd54fd271b0c6a3f424f0672b23b161f07306172 Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 5 Aug 2024 10:31:25 +1200 Subject: [PATCH] add `propagateInterruption` option to Fiber{Handle,Set,Map} (#3407) --- .changeset/short-garlics-vanish.md | 7 ++ packages/effect/src/FiberHandle.ts | 56 ++++++++++++--- packages/effect/src/FiberMap.ts | 61 ++++++++++++---- packages/effect/src/FiberSet.ts | 89 +++++++++++++++++------- packages/effect/test/FiberHandle.test.ts | 28 +++++++- packages/effect/test/FiberMap.test.ts | 28 +++++++- packages/effect/test/FiberSet.test.ts | 32 +++++++-- 7 files changed, 248 insertions(+), 53 deletions(-) create mode 100644 .changeset/short-garlics-vanish.md diff --git a/.changeset/short-garlics-vanish.md b/.changeset/short-garlics-vanish.md new file mode 100644 index 00000000000..fb3a8dfdb86 --- /dev/null +++ b/.changeset/short-garlics-vanish.md @@ -0,0 +1,7 @@ +--- +"effect": minor +--- + +add `propagateInterruption` option to Fiber{Handle,Set,Map} + +This option will send any external interrupts to the .join result. diff --git a/packages/effect/src/FiberHandle.ts b/packages/effect/src/FiberHandle.ts index 174fe90ab9b..4392172a82c 100644 --- a/packages/effect/src/FiberHandle.ts +++ b/packages/effect/src/FiberHandle.ts @@ -10,7 +10,8 @@ import * as Exit from "./Exit.js" import * as Fiber from "./Fiber.js" import * as FiberId from "./FiberId.js" import * as FiberRef from "./FiberRef.js" -import { dual } from "./Function.js" +import { constFalse, dual } from "./Function.js" +import * as HashSet from "./HashSet.js" import * as Inspectable from "./Inspectable.js" import type { FiberRuntime } from "./internal/fiberRuntime.js" import * as Option from "./Option.js" @@ -142,6 +143,17 @@ export const makeRuntime = (): Effect.Effect< (self) => runtime(self)() ) +const internalFiberIdId = -1 +const internalFiberId = FiberId.make(internalFiberIdId, 0) +const isInternalInterruption = Cause.reduceWithContext(undefined, { + emptyCase: constFalse, + failCase: constFalse, + dieCase: constFalse, + interruptCase: (_, fiberId) => HashSet.has(FiberId.ids(fiberId), internalFiberIdId), + sequentialCase: (_, left, right) => left || right, + parallelCase: (_, left, right) => left || right +}) + /** * Set the fiber in a FiberHandle. When the fiber completes, it will be removed from the FiberHandle. * If a fiber is already running, it will be interrupted unless `options.onlyIfMissing` is set. @@ -155,6 +167,7 @@ export const unsafeSet: { options?: { readonly interruptAs?: FiberId.FiberId | undefined readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } ): (self: FiberHandle) => void ( @@ -163,6 +176,7 @@ export const unsafeSet: { options?: { readonly interruptAs?: FiberId.FiberId | undefined readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } ): void } = dual((args) => isFiberHandle(args[0]), ( @@ -171,19 +185,20 @@ export const unsafeSet: { options?: { readonly interruptAs?: FiberId.FiberId | undefined readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } ): void => { if (self.state._tag === "Closed") { - fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + fiber.unsafeInterruptAsFork(FiberId.combine(options?.interruptAs ?? FiberId.none, internalFiberId)) return } else if (self.state.fiber !== undefined) { if (options?.onlyIfMissing === true) { - fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + fiber.unsafeInterruptAsFork(FiberId.combine(options?.interruptAs ?? FiberId.none, internalFiberId)) return } else if (self.state.fiber === fiber) { return } - self.state.fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + self.state.fiber.unsafeInterruptAsFork(FiberId.combine(options?.interruptAs ?? FiberId.none, internalFiberId)) self.state.fiber = undefined } @@ -193,7 +208,14 @@ export const unsafeSet: { if (self.state._tag === "Open" && fiber === self.state.fiber) { self.state.fiber = undefined } - if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) { + if ( + Exit.isFailure(exit) && + ( + options?.propagateInterruption === true ? + !isInternalInterruption(exit.cause) : + !Cause.isInterruptedOnly(exit.cause) + ) + ) { Deferred.unsafeDone(self.deferred, exit as any) } }) @@ -211,6 +233,7 @@ export const set: { fiber: Fiber.RuntimeFiber, options?: { readonly onlyIfMissing?: boolean + readonly propagateInterruption?: boolean | undefined } ): (self: FiberHandle) => Effect.Effect ( @@ -218,6 +241,7 @@ export const set: { fiber: Fiber.RuntimeFiber, options?: { readonly onlyIfMissing?: boolean + readonly propagateInterruption?: boolean | undefined } ): Effect.Effect } = dual((args) => isFiberHandle(args[0]), ( @@ -225,6 +249,7 @@ export const set: { fiber: Fiber.RuntimeFiber, options?: { readonly onlyIfMissing?: boolean + readonly propagateInterruption?: boolean | undefined } ): Effect.Effect => Effect.fiberIdWith( @@ -232,7 +257,8 @@ export const set: { Effect.sync(() => unsafeSet(self, fiber, { interruptAs: fiberId, - onlyIfMissing: options?.onlyIfMissing + onlyIfMissing: options?.onlyIfMissing, + propagateInterruption: options?.propagateInterruption }) ) )) @@ -261,12 +287,12 @@ export const get = (self: FiberHandle): Effect.Effect(self: FiberHandle): Effect.Effect => Effect.uninterruptibleMask((restore) => - Effect.suspend(() => { + Effect.withFiberRuntime((fiber) => { if (self.state._tag === "Closed" || self.state.fiber === undefined) { return Effect.void } return Effect.zipRight( - restore(Fiber.interrupt(self.state.fiber)), + restore(Fiber.interruptAs(self.state.fiber, FiberId.combine(fiber.id(), internalFiberId))), Effect.sync(() => { if (self.state._tag === "Open") { self.state.fiber = undefined @@ -298,6 +324,7 @@ export const run: { self: FiberHandle, options?: { readonly onlyIfMissing?: boolean + readonly propagateInterruption?: boolean | undefined } ): ( effect: Effect.Effect @@ -307,13 +334,17 @@ export const run: { effect: Effect.Effect, options?: { readonly onlyIfMissing?: boolean + readonly propagateInterruption?: boolean | undefined } ): Effect.Effect, never, R> } = function() { const self = arguments[0] as FiberHandle if (Effect.isEffect(arguments[1])) { const effect = arguments[1] - const options = arguments[2] as { readonly onlyIfMissing?: boolean } | undefined + const options = arguments[2] as { + readonly onlyIfMissing?: boolean + readonly propagateInterruption?: boolean | undefined + } | undefined return Effect.suspend(() => { if (self.state._tag === "Closed") { return Effect.interrupt @@ -328,7 +359,10 @@ export const run: { ) }) as any } - const options = arguments[1] as { readonly onlyIfMissing?: boolean } | undefined + const options = arguments[1] as { + readonly onlyIfMissing?: boolean + readonly propagateInterruption?: boolean | undefined + } | undefined return (effect: Effect.Effect) => Effect.suspend(() => { if (self.state._tag === "Closed") { @@ -382,6 +416,7 @@ export const runtime: ( options?: | Runtime.RunForkOptions & { readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ) => Fiber.RuntimeFiber, @@ -397,6 +432,7 @@ export const runtime: ( options?: | Runtime.RunForkOptions & { readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ) => { diff --git a/packages/effect/src/FiberMap.ts b/packages/effect/src/FiberMap.ts index 769a0f9d1e4..57c0fff3732 100644 --- a/packages/effect/src/FiberMap.ts +++ b/packages/effect/src/FiberMap.ts @@ -10,7 +10,8 @@ import * as Exit from "./Exit.js" import * as Fiber from "./Fiber.js" import * as FiberId from "./FiberId.js" import * as FiberRef from "./FiberRef.js" -import { dual } from "./Function.js" +import { constFalse, dual } from "./Function.js" +import * as HashSet from "./HashSet.js" import * as Inspectable from "./Inspectable.js" import type { FiberRuntime } from "./internal/fiberRuntime.js" import * as Iterable from "./Iterable.js" @@ -157,6 +158,17 @@ export const makeRuntime = (): Effect.Effect< (self) => runtime(self)() ) +const internalFiberIdId = -1 +const internalFiberId = FiberId.make(internalFiberIdId, 0) +const isInternalInterruption = Cause.reduceWithContext(undefined, { + emptyCase: constFalse, + failCase: constFalse, + dieCase: constFalse, + interruptCase: (_, fiberId) => HashSet.has(FiberId.ids(fiberId), internalFiberIdId), + sequentialCase: (_, left, right) => left || right, + parallelCase: (_, left, right) => left || right +}) + /** * Add a fiber to the FiberMap. When the fiber completes, it will be removed from the FiberMap. * If the key already exists in the FiberMap, the previous fiber will be interrupted. @@ -171,6 +183,7 @@ export const unsafeSet: { options?: { readonly interruptAs?: FiberId.FiberId | undefined readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ): (self: FiberMap) => void ( @@ -180,6 +193,7 @@ export const unsafeSet: { options?: { readonly interruptAs?: FiberId.FiberId | undefined readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ): void } = dual((args) => isFiberMap(args[0]), ( @@ -189,22 +203,23 @@ export const unsafeSet: { options?: { readonly interruptAs?: FiberId.FiberId | undefined readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ): void => { if (self.state._tag === "Closed") { - fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + fiber.unsafeInterruptAsFork(FiberId.combine(options?.interruptAs ?? FiberId.none, internalFiberId)) return } const previous = MutableHashMap.get(self.state.backing, key) if (previous._tag === "Some") { if (options?.onlyIfMissing === true) { - fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + fiber.unsafeInterruptAsFork(FiberId.combine(options?.interruptAs ?? FiberId.none, internalFiberId)) return } else if (previous.value === fiber) { return } - previous.value.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + previous.value.unsafeInterruptAsFork(FiberId.combine(options?.interruptAs ?? FiberId.none, internalFiberId)) } ;(fiber as FiberRuntime).setFiberRef(FiberRef.unhandledErrorLogLevel, Option.none()) @@ -217,7 +232,14 @@ export const unsafeSet: { if (Option.isSome(current) && fiber === current.value) { MutableHashMap.remove(self.state.backing, key) } - if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) { + if ( + Exit.isFailure(exit) && + ( + options?.propagateInterruption === true ? + !isInternalInterruption(exit.cause) : + !Cause.isInterruptedOnly(exit.cause) + ) + ) { Deferred.unsafeDone(self.deferred, exit as any) } }) @@ -236,6 +258,7 @@ export const set: { fiber: Fiber.RuntimeFiber, options?: { readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ): (self: FiberMap) => Effect.Effect ( @@ -244,6 +267,7 @@ export const set: { fiber: Fiber.RuntimeFiber, options?: { readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ): Effect.Effect } = dual((args) => isFiberMap(args[0]), ( @@ -252,14 +276,15 @@ export const set: { fiber: Fiber.RuntimeFiber, options?: { readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ): Effect.Effect => Effect.fiberIdWith( (fiberId) => Effect.sync(() => unsafeSet(self, key, fiber, { - interruptAs: fiberId, - onlyIfMissing: options?.onlyIfMissing + ...options, + interruptAs: fiberId }) ) )) @@ -349,7 +374,7 @@ export const remove: { key: K ) => Effect.Effect >(2, (self, key) => - Effect.suspend(() => { + Effect.withFiberRuntime((removeFiber) => { if (self.state._tag === "Closed") { return Effect.void } @@ -358,7 +383,7 @@ export const remove: { return Effect.void } // will be removed by the observer - return Fiber.interrupt(fiber.value) + return Fiber.interruptAs(fiber.value, FiberId.combine(removeFiber.id(), internalFiberId)) })) /** @@ -366,14 +391,14 @@ export const remove: { * @categories combinators */ export const clear = (self: FiberMap): Effect.Effect => - Effect.suspend(() => { + Effect.withFiberRuntime((clearFiber) => { if (self.state._tag === "Closed") { return Effect.void } return Effect.forEach(self.state.backing, ([, fiber]) => // will be removed by the observer - Fiber.interrupt(fiber)) + Fiber.interruptAs(fiber, FiberId.combine(clearFiber.id(), internalFiberId))) }) const constInterruptedFiber = (function() { @@ -399,6 +424,7 @@ export const run: { key: K, options?: { readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ): ( effect: Effect.Effect @@ -409,6 +435,7 @@ export const run: { effect: Effect.Effect, options?: { readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ): Effect.Effect, never, R> } = function() { @@ -416,7 +443,10 @@ export const run: { const self = arguments[0] as FiberMap const key = arguments[1] const effect = arguments[2] as Effect.Effect - const options = arguments[3] as { readonly onlyIfMissing?: boolean } | undefined + const options = arguments[3] as { + readonly onlyIfMissing?: boolean + readonly propagateInterruption?: boolean | undefined + } | undefined return Effect.suspend(() => { if (self.state._tag === "Closed") { return Effect.interrupt @@ -433,7 +463,10 @@ export const run: { } const self = arguments[0] as FiberMap const key = arguments[1] - const options = arguments[2] as { readonly onlyIfMissing?: boolean } | undefined + const options = arguments[2] as { + readonly onlyIfMissing?: boolean + readonly propagateInterruption?: boolean | undefined + } | undefined return (effect: Effect.Effect) => Effect.suspend(() => { if (self.state._tag === "Closed") { @@ -486,6 +519,7 @@ export const runtime: ( options?: | Runtime.RunForkOptions & { readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ) => Fiber.RuntimeFiber, @@ -502,6 +536,7 @@ export const runtime: ( options?: | Runtime.RunForkOptions & { readonly onlyIfMissing?: boolean | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ) => { diff --git a/packages/effect/src/FiberSet.ts b/packages/effect/src/FiberSet.ts index 9ef0e705bfd..53e0a50a8cc 100644 --- a/packages/effect/src/FiberSet.ts +++ b/packages/effect/src/FiberSet.ts @@ -9,7 +9,8 @@ import * as Deferred from "./Deferred.js" import * as Exit from "./Exit.js" import * as Fiber from "./Fiber.js" import * as FiberRef from "./FiberRef.js" -import { dual } from "./Function.js" +import { constFalse, dual } from "./Function.js" +import * as HashSet from "./HashSet.js" import * as Inspectable from "./Inspectable.js" import type { FiberRuntime } from "./internal/fiberRuntime.js" import * as Iterable from "./Iterable.js" @@ -146,6 +147,17 @@ export const makeRuntime = (): Effect.Effec (self) => runtime(self)() ) +const internalFiberIdId = -1 +const internalFiberId = FiberId.make(internalFiberIdId, 0) +const isInternalInterruption = Cause.reduceWithContext(undefined, { + emptyCase: constFalse, + failCase: constFalse, + dieCase: constFalse, + interruptCase: (_, fiberId) => HashSet.has(FiberId.ids(fiberId), internalFiberIdId), + sequentialCase: (_, left, right) => left || right, + parallelCase: (_, left, right) => left || right +}) + /** * Add a fiber to the FiberSet. When the fiber completes, it will be removed. * @@ -157,6 +169,7 @@ export const unsafeAdd: { fiber: Fiber.RuntimeFiber, options?: { readonly interruptAs?: FiberId.FiberId | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ): (self: FiberSet) => void ( @@ -164,6 +177,7 @@ export const unsafeAdd: { fiber: Fiber.RuntimeFiber, options?: { readonly interruptAs?: FiberId.FiberId | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ): void } = dual((args) => isFiberSet(args[0]), ( @@ -171,10 +185,11 @@ export const unsafeAdd: { fiber: Fiber.RuntimeFiber, options?: { readonly interruptAs?: FiberId.FiberId | undefined + readonly propagateInterruption?: boolean | undefined } | undefined ): void => { if (self.state._tag === "Closed") { - fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none) + fiber.unsafeInterruptAsFork(FiberId.combine(options?.interruptAs ?? FiberId.none, internalFiberId)) return } else if (self.state.backing.has(fiber)) { return @@ -186,7 +201,14 @@ export const unsafeAdd: { return } self.state.backing.delete(fiber) - if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) { + if ( + Exit.isFailure(exit) && + ( + options?.propagateInterruption === true ? + !isInternalInterruption(exit.cause) : + !Cause.isInterruptedOnly(exit.cause) + ) + ) { Deferred.unsafeDone(self.deferred, exit as any) } }) @@ -200,26 +222,31 @@ export const unsafeAdd: { */ export const add: { ( - fiber: Fiber.RuntimeFiber + fiber: Fiber.RuntimeFiber, + options?: { + readonly propagateInterruption?: boolean | undefined + } | undefined ): (self: FiberSet) => Effect.Effect ( self: FiberSet, - fiber: Fiber.RuntimeFiber + fiber: Fiber.RuntimeFiber, + options?: { + readonly propagateInterruption?: boolean | undefined + } | undefined ): Effect.Effect -} = dual< - ( - fiber: Fiber.RuntimeFiber - ) => (self: FiberSet) => Effect.Effect, +} = dual( + (args) => isFiberSet(args[0]), ( self: FiberSet, - fiber: Fiber.RuntimeFiber - ) => Effect.Effect ->( - 2, - (self, fiber) => + fiber: Fiber.RuntimeFiber, + options?: { + readonly propagateInterruption?: boolean | undefined + } | undefined + ): Effect.Effect => Effect.fiberIdWith((fiberId) => Effect.sync(() => unsafeAdd(self, fiber, { + ...options, interruptAs: fiberId }) ) @@ -231,13 +258,13 @@ export const add: { * @categories combinators */ export const clear = (self: FiberSet): Effect.Effect => - Effect.suspend(() => { + Effect.withFiberRuntime((clearFiber) => { if (self.state._tag === "Closed") { return Effect.void } return Effect.forEach(self.state.backing, (fiber) => // will be removed by the observer - Fiber.interrupt(fiber)) + Fiber.interruptAs(fiber, FiberId.combine(clearFiber.id(), internalFiberId))) }) /** @@ -248,16 +275,25 @@ export const clear = (self: FiberSet): Effect.Effect => * @categories combinators */ export const run: { - (self: FiberSet): ( + ( + self: FiberSet, + options?: { + readonly propagateInterruption?: boolean | undefined + } | undefined + ): ( effect: Effect.Effect ) => Effect.Effect, never, R> ( self: FiberSet, - effect: Effect.Effect + effect: Effect.Effect, + options?: { + readonly propagateInterruption?: boolean | undefined + } | undefined ): Effect.Effect, never, R> } = function() { const self = arguments[0] as FiberSet - if (arguments.length === 1) { + if (!Effect.isEffect(arguments[1])) { + const options = arguments[1] as { readonly propagateInterruption?: boolean | undefined } | undefined return (effect: Effect.Effect) => Effect.suspend(() => { if (self.state._tag === "Closed") { @@ -266,12 +302,13 @@ export const run: { return Effect.uninterruptibleMask((restore) => Effect.tap( restore(Effect.forkDaemon(effect)), - (fiber) => add(self, fiber) + (fiber) => add(self, fiber, options) ) ) }) } - const effect = arguments[1] as Effect.Effect + const effect = arguments[1] + const options = arguments[2] as { readonly propagateInterruption?: boolean | undefined } | undefined return Effect.suspend(() => { if (self.state._tag === "Closed") { return Effect.interrupt @@ -279,7 +316,7 @@ export const run: { return Effect.uninterruptibleMask((restore) => Effect.tap( restore(Effect.forkDaemon(effect)), - (fiber) => add(self, fiber) + (fiber) => add(self, fiber, options) ) ) }) as any @@ -316,7 +353,9 @@ export const runtime: ( ) => () => Effect.Effect< ( effect: Effect.Effect, - options?: Runtime.RunForkOptions | undefined + options?: + | Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined } + | undefined ) => Fiber.RuntimeFiber, never, R @@ -327,7 +366,9 @@ export const runtime: ( const runFork = Runtime.runFork(runtime) return ( effect: Effect.Effect, - options?: Runtime.RunForkOptions | undefined + options?: + | Runtime.RunForkOptions & { readonly propagateInterruption?: boolean | undefined } + | undefined ) => { const fiber = runFork(effect, options) unsafeAdd(self, fiber) diff --git a/packages/effect/test/FiberHandle.test.ts b/packages/effect/test/FiberHandle.test.ts index 43e49f8ac34..3efc116cab4 100644 --- a/packages/effect/test/FiberHandle.test.ts +++ b/packages/effect/test/FiberHandle.test.ts @@ -1,4 +1,4 @@ -import { Effect, Exit, Ref } from "effect" +import { Deferred, Effect, Exit, Fiber, Ref } from "effect" import * as FiberHandle from "effect/FiberHandle" import * as it from "effect/test/utils/extend" import { assert, describe } from "vitest" @@ -74,4 +74,30 @@ describe("FiberHandle", () => { assert.isTrue(Exit.isInterrupted(yield* _(fiberC.await))) assert.strictEqual(fiberA.unsafePoll(), null) })) + + it.scoped("propagateInterruption: false", () => + Effect.gen(function*() { + const handle = yield* FiberHandle.make() + const fiber = yield* FiberHandle.run(handle, Effect.never, { + propagateInterruption: false + }) + yield* Effect.yieldNow() + yield* Fiber.interrupt(fiber) + assert.isFalse(yield* Deferred.isDone(handle.deferred)) + })) + + it.scoped("propagateInterruption: true", () => + Effect.gen(function*() { + const handle = yield* FiberHandle.make() + const fiber = yield* FiberHandle.run(handle, Effect.never, { + propagateInterruption: true + }) + yield* Effect.yieldNow() + yield* Fiber.interrupt(fiber) + assert.isTrue(Exit.isInterrupted( + yield* FiberHandle.join(handle).pipe( + Effect.exit + ) + )) + })) }) diff --git a/packages/effect/test/FiberMap.test.ts b/packages/effect/test/FiberMap.test.ts index 3ff78f2e3d1..c0453ef452e 100644 --- a/packages/effect/test/FiberMap.test.ts +++ b/packages/effect/test/FiberMap.test.ts @@ -1,4 +1,4 @@ -import { Array, Effect, Exit, Ref, Scope } from "effect" +import { Array, Deferred, Effect, Exit, Fiber, Ref, Scope } from "effect" import * as FiberMap from "effect/FiberMap" import * as it from "effect/test/utils/extend" import { assert, describe } from "vitest" @@ -96,4 +96,30 @@ describe("FiberMap", () => { assert.isTrue(Exit.isInterrupted(yield* _(fiberC.await))) assert.strictEqual(fiberA.unsafePoll(), null) })) + + it.scoped("propagateInterruption false", () => + Effect.gen(function*() { + const map = yield* FiberMap.make() + const fiber = yield* FiberMap.run(map, "a", Effect.never, { + propagateInterruption: false + }) + yield* Effect.yieldNow() + yield* Fiber.interrupt(fiber) + assert.isFalse(yield* Deferred.isDone(map.deferred)) + })) + + it.scoped("propagateInterruption true", () => + Effect.gen(function*() { + const map = yield* FiberMap.make() + const fiber = yield* FiberMap.run(map, "a", Effect.never, { + propagateInterruption: true + }) + yield* Effect.yieldNow() + yield* Fiber.interrupt(fiber) + assert.isTrue(Exit.isInterrupted( + yield* FiberMap.join(map).pipe( + Effect.exit + ) + )) + })) }) diff --git a/packages/effect/test/FiberSet.test.ts b/packages/effect/test/FiberSet.test.ts index 80cd0781867..84ead23be50 100644 --- a/packages/effect/test/FiberSet.test.ts +++ b/packages/effect/test/FiberSet.test.ts @@ -1,4 +1,4 @@ -import { Array, Effect, Exit, Ref, Scope } from "effect" +import { Array, Deferred, Effect, Exit, Fiber, Ref, Scope } from "effect" import * as FiberSet from "effect/FiberSet" import * as it from "effect/test/utils/extend" import { assert, describe } from "vitest" @@ -14,9 +14,7 @@ describe("FiberSet", () => { Effect.onInterrupt( Effect.never, () => Ref.update(ref, (n) => n + 1) - ).pipe( - FiberSet.run(set) - ), + ).pipe(FiberSet.run(set)), Effect.replicateEffect(10) ) yield* _(Effect.yieldNow()) @@ -70,4 +68,30 @@ describe("FiberSet", () => { yield* _(Scope.close(scope, Exit.void)) assert.strictEqual(yield* _(FiberSet.size(set)), 0) })) + + it.scoped("propagateInterruption false", () => + Effect.gen(function*() { + const set = yield* FiberSet.make() + const fiber = yield* FiberSet.run(set, Effect.never, { + propagateInterruption: false + }) + yield* Effect.yieldNow() + yield* Fiber.interrupt(fiber) + assert.isFalse(yield* Deferred.isDone(set.deferred)) + })) + + it.scoped("propagateInterruption true", () => + Effect.gen(function*() { + const set = yield* FiberSet.make() + const fiber = yield* FiberSet.run(set, Effect.never, { + propagateInterruption: true + }) + yield* Effect.yieldNow() + yield* Fiber.interrupt(fiber) + assert.isTrue(Exit.isInterrupted( + yield* FiberSet.join(set).pipe( + Effect.exit + ) + )) + })) })