Skip to content

Commit 4b1e559

Browse files
committedMar 10, 2025
JS-2170 FIX Memory leak when no events are emitted while clients await them
1 parent 4115c8d commit 4b1e559

File tree

2 files changed

+25
-4
lines changed

2 files changed

+25
-4
lines changed
 

‎js7-base/jvm/src/main/scala/js7/base/stream/IncreasingNumberSync.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import js7.base.monixutils.MonixDeadline
77
import js7.base.stream.IncreasingNumberSync.*
88
import js7.base.time.ScalaTime.*
99
import monix.eval.Task
10+
import monix.execution.CancelablePromise
1011
import org.jetbrains.annotations.TestOnly
1112
import scala.collection.mutable
12-
import scala.concurrent.Promise
1313
import scala.concurrent.duration.*
1414

1515
/**
@@ -18,7 +18,7 @@ import scala.concurrent.duration.*
1818
final class IncreasingNumberSync(initial: Long, valueToString: Long => String)
1919
{
2020
// TODO Watch size of valueToPromise (for example via an inspection web service)
21-
private val valueToPromise = mutable.TreeMap[Long, Promise[Unit]]()
21+
private val valueToPromise = mutable.TreeMap[Long, CancelablePromise[Unit]]()
2222
@volatile private var _last = initial
2323

2424
def onAdded(a: Long): Unit = {
@@ -66,7 +66,7 @@ final class IncreasingNumberSync(initial: Long, valueToString: Long => String)
6666
if (after < _last)
6767
RightTrue
6868
else {
69-
val promise = valueToPromise.getOrElseUpdate(after, Promise())
69+
val promise = valueToPromise.getOrElseUpdate(after, CancelablePromise())
7070
Task.fromFuture(promise.future)
7171
.as(Left(())) // Check again
7272
}

‎js7-base/jvm/src/test/scala/js7/base/stream/IncreasingNumberSyncTest.scala

+22-1
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import js7.base.thread.Futures.implicits.*
66
import js7.base.time.ScalaTime.*
77
import js7.base.time.Stopwatch
88
import monix.eval.Task
9-
import monix.execution.Scheduler
109
import monix.execution.schedulers.TestScheduler
10+
import monix.execution.{CancelableFuture, Scheduler}
1111
import scala.concurrent.Future
1212
import scala.concurrent.duration.*
1313

@@ -104,4 +104,25 @@ final class IncreasingNumberSyncTest extends OurTestSuite
104104
assert(sync.waitingCount == 0)
105105
}
106106
}
107+
108+
if (sys.runtime.maxMemory < 60_000_000)
109+
"OutOfMemoryError" in {
110+
import Scheduler.Implicits.global
111+
112+
val sync = new IncreasingNumberSync(initial = 0L, _.toString)
113+
sync.onAdded(1)
114+
val future: CancelableFuture[Boolean] = {
115+
var future: CancelableFuture[Boolean] = CancelableFuture.never
116+
for (i <- 1 to 20_000_000) {
117+
future.cancel()
118+
future = Task.defer(sync.whenAvailable(after = 1, until = Some(now + 99.seconds)))
119+
.runToFuture
120+
sleep(10.µs)
121+
}
122+
future
123+
}
124+
sync.onAdded(2)
125+
future await 99.s
126+
assert(sync.waitingCount == 0)
127+
}
107128
}

0 commit comments

Comments
 (0)