Skip to content

Commit

Permalink
prevent reclaim of manually invalidated pool items (#3210)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Jul 10, 2024
1 parent 366f2ee commit 7af137c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
5 changes: 5 additions & 0 deletions .changeset/honest-hats-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

prevent reclaim of manually invalidated pool items
22 changes: 15 additions & 7 deletions packages/effect/src/internal/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ interface PoolItem<A, E> {
readonly exit: Exit<A, E>
finalizer: Effect<void>
refCount: number
disableReclaim: boolean
}

interface Strategy<A, E> {
Expand Down Expand Up @@ -153,7 +154,8 @@ class PoolImpl<A, E> implements Pool<A, E> {
const item: PoolItem<A, E> = {
exit,
finalizer: core.catchAllCause(scope.close(exit), reportUnhandledError),
refCount: 0
refCount: 0,
disableReclaim: false
}
this.items.add(item)
this.available.add(item)
Expand Down Expand Up @@ -261,6 +263,7 @@ class PoolImpl<A, E> implements Pool<A, E> {
if (this.isShuttingDown) return core.void
for (const poolItem of this.items) {
if (poolItem.exit._tag === "Success" && poolItem.exit.value === item) {
poolItem.disableReclaim = true
return core.uninterruptible(this.invalidatePoolItem(poolItem))
}
}
Expand Down Expand Up @@ -371,16 +374,21 @@ const strategyUsageTTL = <A, E>(ttl: Duration.DurationInput) =>
},
onAcquire: (item) => queue.offer(item),
reclaim(pool) {
return core.suspend(() => {
return core.suspend((): Effect<Option.Option<PoolItem<A, E>>> => {
if (pool.invalidated.size === 0) {
return coreEffect.succeedNone
}
const item = Iterable.unsafeHead(pool.invalidated)
pool.invalidated.delete(item)
if (item.refCount < pool.concurrency) {
pool.available.add(item)
const item = Iterable.head(
Iterable.filter(pool.invalidated, (item) => !item.disableReclaim)
)
if (item._tag === "None") {
return coreEffect.succeedNone
}
pool.invalidated.delete(item.value)
if (item.value.refCount < pool.concurrency) {
pool.available.add(item.value)
}
return core.as(queue.offer(item), Option.some(item))
return core.as(queue.offer(item.value), item)
})
}
})
Expand Down

0 comments on commit 7af137c

Please sign in to comment.