-
Notifications
You must be signed in to change notification settings - Fork 613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
When recovering from failure scopes are not closed and resources not released #1022
Comments
We're talking about the current version of scope right? (before the interruption feature) |
yes |
Yeah, openScope.flatMap { newScope =>
FreeC.Bind(self.get[F,O2], (e: Either[Throwable,R]) => e match {
case Left(e) => closeScope(newScope) flatMap { _ => h(e).get }
case Right(r) => closeScope(newScope) map { _ => r }
})
} |
@mpilquist yeah perhaps, however unlike now, this piece of code would somehow never hit the Left(err) branch when eva fails l in flatMap. I am sort of completely happy how |
@mpilquist Just to clarify |
@pchlupacek I don't follow -- can you restate? |
@mpilquist we have that already in definition of Algebra#scope. However currently, the Left() branch of that code is never evaluated // consulted in case the eval() in the scope will fail its execution. To make that change at |
If I understand correctly, there seems to be a dead branch in the interpreter code ( As a separate point, I'm not sure |
this is a small printout of the program evaluation. As you may see no CloseScope ever interpretted. and finalizer is interpreted as very last just before result is output |
@pchlupacek Try with the version I posted -- the current definition of |
@SystemFw yes exactly |
@mpilquist will try |
So to make this simple: CLOSE PATH: So where the heck is 78dd667e is closed? Assuming 6b419da is root terminated when the stream will terminate.... |
I'm still having a little trouble following the results. The change I suggested to |
@mpilquist yes, thats what worries me. You see, the solution with interruption, relies on fact that we could define somehow the scope boundaries and then when scope is Now it seems that we could easily build stream structures, that when failed will never visit relevant close of the scope, and will get released at earliest opportunity (that may be very well in most cases the end of the stream). This effectively prohibits any interruption that will be resumed by Also I am still trying to grasp whats impact of this, but seems w/o modified What I would like to be able to do is to have scope, that is always guaranteed to always close all its child scopes when fails. That will enable code like this to work
where if Assuming |
@pchlupacek We've always been able to build such structures -- that's why closing a scope will always close the entire subtree of scopes. One of the pathological cases is where 2 sibling scopes can toggle back and forth as the active scope. |
Hm, I really want to avoid to control interruption scope validity explicitly. I just want |
@mpilquist promised example where the resources will leak now : Stream(1, 2, 3).covary[IO].onFinalize(IO { println("DONE123") }).flatMap {
x => Stream.raiseError(new Throwable("BOOM"))
}
.handleErrorWith { err =>
println(s"HANDLING: $err"); Stream(3, 4).onFinalize(IO { println("DONE RECOVER")})
}.run.unsafeRunSync() results in
expected
|
@pchlupacek That's not a leak unless I'm missing something? That's an expectation mismatch on where scopes are inserted. For example, inserting a manual scope before calling @ Stream(1, 2, 3).covary[IO].onFinalize(IO { println("DONE123") }).flatMap {
x => Stream.raiseError(new Throwable("BOOM"))
}.scope.handleErrorWith { err =>
println(s"HANDLING: $err"); Stream(3, 4).onFinalize(IO { println("DONE RECOVER")})
}.run.unsafeRunSync() outputs:
|
Fwiw, I wouldn't consider either behaviour faulty. They both seem like perfectly valid semantics, and I can't make my mind as to which one is more desirable. |
@mpilquist @SystemFw I am not doing good job to explain whats going on there and how bad that is. So allow me to try to do my job a little bit better: Given the stream Stream.bracket(IO{()})(use = _ => Stream(1,0,2), release = _ => IO { println("release") })
.flatMap { i => Stream.eval(IO { 1/ i }) } What you would perhaps sort of think would be sort of equivalent to fromFreeC(Algebra.acquire[F,O,R](r, release) flatMap { case (r, token) =>
(Stream.eval(IO{1/1}) ++ Stream.eval(IO{1/0}) ++ Stream.eval(IO {1/2}))
.onComplete { fromFreeC(Algebra.release(token)) }
} But in reality, current flatMap implementation translates this to sort of equivalent of (note braces) fromFreeC(Algebra.acquire[F,O,R](r, release) flatMap { case (r, token) =>
(Stream.eval(IO{1/1}) ++ Stream.eval(IO{1/0})) ++
Stream.eval(IO {1/2}).onComplete { fromFreeC(Algebra.release(token)) }
} The situation where this has really catastrophic impact is for example this program: val semaphore = async.semaphore[IO](1).unsafeRunSync()
def acquireExclusiveLock = semaphore.decrement flatMap { _ => IO { println ("acquired") } }
def releaseExclusiveLock = semaphore.increment flatMap { _ => IO { println ("released") } }
def tryReadFromFile: Stream[IO, Int] = {
def fromFileExclusively =
Stream.bracket(acquireExclusiveLock)(
use = _ => Stream(1, 0, 2) // in real case this will read from file data
, release = _ => releaseExclusiveLock
)
fromFileExclusively
.flatMap { i => Stream.eval(IO { 1/i }) }
.handleErrorWith { _ => Stream.eval_(IO { println("FAIL") }) ++ tryReadFromFile }
}
tryReadFromFile.run.unsafeRunSync()
What you would as user expect here, is that when the In other worlds you would expect something like:
but in reality this program behaves like
and then deadlocks. Note that this is very common pattern when you define an API as library, for example reading from kafka, databases et all, and this must work w/o user knowing the structure of the stream that library delivers to him neither requiring him to inject scopes at specific places. This has nothing to do with interruption work, its simply unsound implementation of |
@pchlupacek Your example doesn't show any issue with def tryReadFromFile: Stream[IO, Int] = {
def fromFileExclusively =
Stream.bracket(acquireExclusiveLock)(
use = _ => Stream(1, 0, 2) // in real case this will read from file data
, release = _ => releaseExclusiveLock
)
fromFileExclusively
.flatMap { i => Stream.eval(IO { 1/i }) }
.scope // <--- Imagine this scope was automatically inserted by the handleErrorWith method
.handleErrorWith { _ => Stream.eval_(IO { println("FAIL") }) ++ tryReadFromFile }
} Here I inserted a call to
|
@mpilquist this is not solving the broken flatMap (I really ensist on fact that implementation of flatMap IS broken), but going around the problem it introduces. I am not saying however that this is not viable approach. I'll do some more testing on this and see where it goes. I just remember that last time I tried it unfortunately did not solve all the cases, but let me recheck that. |
OK give it a shot and let me know. I really believe |
@mpilquist the issue in flatMap is unrelated to scopes. I see it as flatMap is ignoring any finalisers (essentially left branch of evaluation) when stream produced by def p1: Stream[IO, Int] =
Stream(1,0,2)
.flatMap(i => Stream.eval(IO { 1 /i }))
.onFinalize(IO { println("CLN")})
.handleErrorWith { _ => Stream.eval_(IO { println("FAIL") }) ++ p1 }
// --- AND
def p2: Stream[IO, Int] =
Stream(1,0,2)
.onFinalize(IO { println("CLN")})
.flatMap(i => Stream.eval(IO { 1 /i }))
.handleErrorWith { _ => Stream.eval_(IO { println("FAIL") }) ++ p2 }
are completely different programs and they SHOULD not be. the first one works as expected, while second one needs scope to be inserted. |
You can't talk about finalizers without talking about scopes -- scopes are how we implement finalizers and as a result, the two concepts are inextricably linked. Trying to special purpose As for the It's not necessary for With that said, if we modify |
@mpilquist agree with what you say, however please note that resources lifetime may NOT be exactly the same as the scope's lifetime. For example resource may be closed before scope is closed, as a result of When we add scope to As a side note, I just used |
Well I still object to using the term
Yes, but this is the role of all scopes -- it's literally the only reason they exist.
Note this behavior is exactly why |
@mpilquist ah, ok no problem with that terminology, I agree this is perhaps too strong. Lets agree to keep that design if possible. Do you have any other places where this may be a problem? I would like to test them for interruption too... |
resolved #1019 |
Given following program:
The program albeit returning the correct result (Vector(5,6,7)) does not
close scopes when recovering with
handleErrorWith
. in fact, most active scope is used and kept asparent scope for evaluation of handleErrorWith. As a result the resources acquired before
handleErrorWith
are not released until complete stream will finish.Not only that present the resource and memory leak, but also makes safe interruption impossible.
The reason is, that when interpreting Algebra.eval, the next f(Left(err)) evaluated will somehow skip all other algebras that has to follow it in case of failure and immediatelly skip to handleErrorWith case
As a result, closeScope that shall normally close scope and release resource is never invoked.
In fact, I was not able to construct stream, where
Left(err)
handler ofAlgebra.scope
is evaluated ever at all.@mpilquist @SystemFw any ideas?
Note that if you remove the
scope
mark, the stream behaves as expected.The text was updated successfully, but these errors were encountered: