Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When recovering from failure scopes are not closed and resources not released #1022

Closed
pchlupacek opened this issue Dec 18, 2017 · 30 comments
Closed
Milestone

Comments

@pchlupacek
Copy link
Contributor

pchlupacek commented Dec 18, 2017

Given following program:

Stream(1,2,3).covary[IO].scope
.flatMap { i =>
    Stream.eval(F.fail(new Throwable("BOOM"))) ++ Stream(10,11,12)
}
.onFinalize(IO{ println("DONE FINALIZER")})
.handleErrorWith { case err =>  println(s"XXXR : $err"); Stream(5,6,7)})
.runLog.unsafeRunSync()

The program albeit returning the correct result (Vector(5,6,7)) does not
close scopes when recovering with handleErrorWith. in fact, most active scope is used and kept as
parent scope for evaluation of handleErrorWith. As a result the resources acquired before handleErrorWith are not released until complete stream will finish.

Not only that present the resource and memory leak, but also makes safe interruption impossible.

The reason is, that when interpreting Algebra.eval, the next f(Left(err)) evaluated will somehow skip all other algebras that has to follow it in case of failure and immediatelly skip to handleErrorWith case

As a result, closeScope that shall normally close scope and release resource is never invoked.

In fact, I was not able to construct stream, where Left(err) handler of Algebra.scope is evaluated ever at all.

@mpilquist @SystemFw any ideas?

Note that if you remove the scope mark, the stream behaves as expected.

@SystemFw
Copy link
Collaborator

SystemFw commented Dec 18, 2017

We're talking about the current version of scope right? (before the interruption feature)

@pchlupacek
Copy link
Contributor Author

yes

@mpilquist
Copy link
Member

mpilquist commented Dec 18, 2017

Yeah, handleErrorWith doesn't say anything about scope management. If we wanted to change that, I think we could redefine Stream#handleErrorWith to something like:

openScope.flatMap { newScope =>
  FreeC.Bind(self.get[F,O2], (e: Either[Throwable,R]) => e match {
    case Left(e) => closeScope(newScope) flatMap { _ => h(e).get }
    case Right(r) => closeScope(newScope) map { _ => r }
  })
}

@pchlupacek
Copy link
Contributor Author

pchlupacek commented Dec 18, 2017

@mpilquist yeah perhaps, however unlike now, this piece of code would somehow never hit the Left(err) branch when eva fails l in flatMap. I am sort of completely happy how handleErrorWith works now, but, the problem is that scope's closeScope Algebra is never invoked on failure of eval in flatMap.

@pchlupacek
Copy link
Contributor Author

@mpilquist Just to clarify unlinke now => it will never now

@mpilquist
Copy link
Member

@pchlupacek I don't follow -- can you restate?

@pchlupacek
Copy link
Contributor Author

@mpilquist we have that already in definition of Algebra#scope. However currently, the Left() branch of that code is never evaluated // consulted in case the eval() in the scope will fail its execution.

To make that change at handleWithError i am not sure if this will behave differently from the Algebra#scope.

@SystemFw
Copy link
Collaborator

If I understand correctly, there seems to be a dead branch in the interpreter code (Left(err) on Algebra.scope).

As a separate point, I'm not sure handleErrorWith should close a scope, since after all the error was resolved

@pchlupacek
Copy link
Contributor Author

RUNFOLD(Token(5906ebcb)) : fs2.internal.FreeC$ViewL@1ba5fd17
RUNFOLD(Token(5906ebcb)) GET : Bind(Eval(Acquire(IO(()),fs2.Stream$InvariantOps$$$Lambda$29/1757676444@7c417213)),fs2.internal.Algebra$$$Lambda$23/1442045361@15761df8)
RUNFOLD(Token(5906ebcb)) : fs2.internal.FreeC$ViewL@7436a2d4
RUNFOLD(Token(5906ebcb)) GET : Bind(Eval(OpenScope(None)),fs2.internal.Algebra$$$Lambda$23/1442045361@70be0a2b)
RUNFOLD(Token(12b0404f)) : fs2.internal.FreeC$ViewL@99f673e
RUNFOLD(Token(12b0404f)) GET : Bind(Eval(Eval(IO(throw java.lang.Throwable: BOOM))),fs2.internal.Algebra$$$Lambda$23/1442045361@5276e6b0)
RUNFOLD(Token(12b0404f)) : Left(java.lang.Throwable: BOOM) : Bind(Eval(Release(Token(159f197))),fs2.internal.Algebra$$$Lambda$23/1442045361@78aab498) : Bind(Eval(Release(Token(159f197))),fs2.internal.Algebra$$$Lambda$23/1442045361@78aab498)
RUNFOLD(Token(12b0404f)) : fs2.internal.FreeC$ViewL@bc6fe04
RUNFOLD(Token(12b0404f)) GET : Bind(Eval(Release(Token(159f197))),fs2.internal.Algebra$$$Lambda$23/1442045361@78aab498)
XXXR : java.lang.Throwable: BOOM
RUNFOLD(Token(12b0404f)) : fs2.internal.FreeC$ViewL@77ff14b0
RUNFOLD(Token(12b0404f)) GET : Pure(Some((Chunk(5, 6, 7),Pure(()))))
RUNFOLD(Token(12b0404f)) : fs2.internal.FreeC$ViewL@6d3d5f69
RUNFOLD(Token(12b0404f)) GET : Pure(None)
DONE FINALIZER
DONE: Vector(5, 6, 7)

this is a small printout of the program evaluation. As you may see no CloseScope ever interpretted. and finalizer is interpreted as very last just before result is output

@mpilquist
Copy link
Member

@pchlupacek Try with the version I posted -- the current definition of handleErrorWith will in fact short circuit any scopes but the version I posted shouldn't. (Haven't tested though)

@pchlupacek
Copy link
Contributor Author

@SystemFw yes exactly

@pchlupacek
Copy link
Contributor Author

@mpilquist will try

@pchlupacek
Copy link
Contributor Author

@mpilquist @SystemFw :

RUNFOLD(Token(6b419da)) GET : Bind(Eval(OpenScope(None)),fs2.internal.Algebra$$$Lambda$23/1442045361@6646153)
RUNFOLD(Token(6b419da)): OPEN SCOPE RunFoldScope(id=Token(15761df8),interruptible=false)
RUNFOLD(Token(15761df8)) GET : Bind(Eval(Acquire(IO(()),fs2.Stream$InvariantOps$$$Lambda$29/1757676444@1a0dcaa)),fs2.internal.Algebra$$$Lambda$23/1442045361@3bd40a57)
RUNFOLD(Token(15761df8)) GET : Bind(Eval(OpenScope(None)),fs2.internal.Algebra$$$Lambda$23/1442045361@1d76aeea)
RUNFOLD(Token(15761df8)): OPEN SCOPE RunFoldScope(id=Token(78dd667e),interruptible=false)
RUNFOLD(Token(78dd667e)) GET : Bind(Eval(OpenScope(None)),fs2.internal.Algebra$$$Lambda$23/1442045361@10db82ae)
RUNFOLD(Token(78dd667e)): OPEN SCOPE RunFoldScope(id=Token(501edcf1),interruptible=false)
RUNFOLD(Token(501edcf1)) GET : Bind(Eval(Eval(IO(throw java.lang.Throwable: BOOM))),fs2.internal.Algebra$$$Lambda$23/1442045361@18ce0030)
RUNFOLD(Token(501edcf1)) : Left(java.lang.Throwable: BOOM) : Bind(Eval(CloseScope(RunFoldScope(id=Token(78dd667e),interruptible=false))),fs2.internal.Algebra$$$Lambda$23/1442045361@25d250c6) : Bind(Eval(CloseScope(RunFoldScope(id=Token(78dd667e),interruptible=false))),fs2.internal.Algebra$$$Lambda$23/1442045361@25d250c6)
RUNFOLD(Token(501edcf1)) GET : Bind(Eval(CloseScope(RunFoldScope(id=Token(78dd667e),interruptible=false))),fs2.internal.Algebra$$$Lambda$23/1442045361@25d250c6)
RUNFOLD(Token(501edcf1)): CLOSE SCOPE Right(())
RUNFOLD(Token(501edcf1)): CLOSE SCOPE AFTER RunFoldScope(id=Token(15761df8),interruptible=false)
RUNFOLD(Token(15761df8)) GET : Bind(Eval(Release(Token(6f96c77))),fs2.internal.Algebra$$$Lambda$23/1442045361@be64738)
DONE FINALIZER
RUNFOLD(Token(15761df8)) GET : Bind(Eval(CloseScope(RunFoldScope(id=Token(15761df8),interruptible=false))),fs2.internal.Algebra$$$Lambda$23/1442045361@3c130745)
RUNFOLD(Token(15761df8)): CLOSE SCOPE Right(())
RUNFOLD(Token(15761df8)): CLOSE SCOPE AFTER RunFoldScope(id=Token(6b419da),interruptible=false)
XXXR : java.lang.Throwable: BOOM
RUNFOLD(Token(6b419da)) GET : Pure(Some((Chunk(5, 6, 7),Pure(()))))
RUNFOLD(Token(6b419da)) GET : Pure(None)
DONE: Vector(5, 6, 7)

So to make this simple:
OPEN PATH:
6b419da -> 15761df8 -> 78dd667e -> 501edcf1

CLOSE PATH:
501edcf1 -> 15761df8

So where the heck is 78dd667e is closed? Assuming 6b419da is root terminated when the stream will terminate....

@mpilquist
Copy link
Member

I'm still having a little trouble following the results. The change I suggested to handleErrorWith opens a new scope and then closes that scope upon success or failure from the original stream. If the original stream fails, after scope closure, the failure is passed to the function supplied to handleErrorWith. So it's definitely possible to create traces where some inner scopes are never explicitly closed via a CloseScope but end up closed as a consequence of the handleErrorWith's scope closure.

@pchlupacek
Copy link
Contributor Author

@mpilquist yes, thats what worries me. You see, the solution with interruption, relies on fact that we could define somehow the scope boundaries and then when scope is closed we will stop propagate Interrupt any more.

Now it seems that we could easily build stream structures, that when failed will never visit relevant close of the scope, and will get released at earliest opportunity (that may be very well in most cases the end of the stream). This effectively prohibits any interruption that will be resumed by ++ code.

Also I am still trying to grasp whats impact of this, but seems w/o modified handleErrorWith's code we may leak resources (that is not related to interruption to whatsoever).

What I would like to be able to do is to have scope, that is always guaranteed to always close all its child scopes when fails.

That will enable code like this to work

s.handleWithError(handler)
.flatMap { x => doSomethingWithX }

where if s fails, this will interrupt doSomethingWithX and then when handler resumes this will continue normally including the code in flatMap.

Assuming s is asynchronous nature (i.e. merge)

@mpilquist
Copy link
Member

@pchlupacek We've always been able to build such structures -- that's why closing a scope will always close the entire subtree of scopes. One of the pathological cases is where 2 sibling scopes can toggle back and forth as the active scope.

@pchlupacek
Copy link
Contributor Author

Hm, I really want to avoid to control interruption scope validity explicitly. I just want ++ to cease the interruption, or in this particular case handleErrorWith. I am really trying to avoid onInterrupt or similar that will work like ++ but will discard Interrupted exception.

@pchlupacek
Copy link
Contributor Author

pchlupacek commented Dec 19, 2017

@mpilquist promised example where the resources will leak now :

 Stream(1, 2, 3).covary[IO].onFinalize(IO { println("DONE123") }).flatMap {
    x => Stream.raiseError(new Throwable("BOOM"))
  }
  .handleErrorWith { err =>
    println(s"HANDLING: $err"); Stream(3, 4).onFinalize(IO { println("DONE RECOVER")})
  }.run.unsafeRunSync()

results in

HANDLING: java.lang.Throwable: BOOM
DONE RECOVER
DONE123 

expected

DONE123
HANDLING: java.lang.Throwable: BOOM
DONE RECOVER 

@mpilquist
Copy link
Member

mpilquist commented Dec 19, 2017

@pchlupacek That's not a leak unless I'm missing something? That's an expectation mismatch on where scopes are inserted. For example, inserting a manual scope before calling handleErrorWith (which is effectively what I suggested in #1021) gives the result you expected:

@ Stream(1, 2, 3).covary[IO].onFinalize(IO { println("DONE123") }).flatMap {
      x => Stream.raiseError(new Throwable("BOOM"))
    }.scope.handleErrorWith { err =>
      println(s"HANDLING: $err"); Stream(3, 4).onFinalize(IO { println("DONE RECOVER")})
    }.run.unsafeRunSync()

outputs:

DONE123
HANDLING: java.lang.Throwable: BOOM
DONE RECOVER

@SystemFw
Copy link
Collaborator

SystemFw commented Dec 19, 2017

Fwiw, I wouldn't consider either behaviour faulty. They both seem like perfectly valid semantics, and I can't make my mind as to which one is more desirable.

@pchlupacek
Copy link
Contributor Author

@mpilquist @SystemFw I am not doing good job to explain whats going on there and how bad that is. So allow me to try to do my job a little bit better:

Given the stream

Stream.bracket(IO{()})(use = _ => Stream(1,0,2), release = _ => IO { println("release") })
.flatMap { i => Stream.eval(IO { 1/ i }) }

What you would perhaps sort of think would be sort of equivalent to

fromFreeC(Algebra.acquire[F,O,R](r, release) flatMap { case (r, token) => 
   (Stream.eval(IO{1/1}) ++ Stream.eval(IO{1/0}) ++ Stream.eval(IO {1/2})) 
   .onComplete { fromFreeC(Algebra.release(token)) }
} 

But in reality, current flatMap implementation translates this to sort of equivalent of (note braces)

fromFreeC(Algebra.acquire[F,O,R](r, release) flatMap { case (r, token) => 
   (Stream.eval(IO{1/1}) ++ Stream.eval(IO{1/0})) ++ 
   Stream.eval(IO {1/2}).onComplete { fromFreeC(Algebra.release(token)) }
} 

The situation where this has really catastrophic impact is for example this program:

val semaphore = async.semaphore[IO](1).unsafeRunSync()

def acquireExclusiveLock = semaphore.decrement flatMap { _ => IO { println ("acquired") } }
def releaseExclusiveLock = semaphore.increment flatMap { _ => IO { println ("released") } }

 def tryReadFromFile: Stream[IO, Int] = {
    def fromFileExclusively =
      Stream.bracket(acquireExclusiveLock)(
        use = _ => Stream(1, 0, 2) // in real case this will read from file data
        , release = _ => releaseExclusiveLock
      )

    fromFileExclusively
    .flatMap { i => Stream.eval(IO { 1/i }) }
    .handleErrorWith { _ => Stream.eval_(IO { println("FAIL")  }) ++ tryReadFromFile }
  }

 tryReadFromFile.run.unsafeRunSync()

What you would as user expect here, is that when the fromFileExclusively.flatMap stream fails, you retry by handleErrorWith, until it succeeds (in this particular case, the use always fail), causing multiple evaluation of use always after releasing and acquiring the lock.

In other worlds you would expect something like:

acquired
released
FAIL
acquired
released
FAIL
.... and so on ... 

but in reality this program behaves like

acquired 
FAIL

and then deadlocks.

Note that this is very common pattern when you define an API as library, for example reading from kafka, databases et all, and this must work w/o user knowing the structure of the stream that library delivers to him neither requiring him to inject scopes at specific places.

This has nothing to do with interruption work, its simply unsound implementation of flatMap

@mpilquist
Copy link
Member

mpilquist commented Dec 20, 2017

@pchlupacek Your example doesn't show any issue with flatMap but rather argues for a scope to be inserted before handleErrorWith. Doing so manually gives the behavior you want:

 def tryReadFromFile: Stream[IO, Int] = {
    def fromFileExclusively =
      Stream.bracket(acquireExclusiveLock)(
        use = _ => Stream(1, 0, 2) // in real case this will read from file data
        , release = _ => releaseExclusiveLock
      )

    fromFileExclusively
    .flatMap { i => Stream.eval(IO { 1/i }) }
    .scope // <--- Imagine this scope was automatically inserted by the handleErrorWith method
    .handleErrorWith { _ => Stream.eval_(IO { println("FAIL")  }) ++ tryReadFromFile }
  }

Here I inserted a call to .scope manually but imagine that was inside the handleErrorWith definition.

acquired
released
FAIL
acquired
released
FAIL
acquired
released
FAIL
...

@pchlupacek
Copy link
Contributor Author

@mpilquist this is not solving the broken flatMap (I really ensist on fact that implementation of flatMap IS broken), but going around the problem it introduces. I am not saying however that this is not viable approach. I'll do some more testing on this and see where it goes. I just remember that last time I tried it unfortunately did not solve all the cases, but let me recheck that.

@mpilquist
Copy link
Member

OK give it a shot and let me know. I really believe flatMap is correct as defined and that if you want scopes, you need to introduce them either implicitly or explicitly. Hence, my proposed change to handleErrorWith is not working around an issue with flatMap but rather is a principled approach to addressing the issue.

@pchlupacek
Copy link
Contributor Author

pchlupacek commented Dec 20, 2017

@mpilquist the issue in flatMap is unrelated to scopes. I see it as flatMap is ignoring any finalisers (essentially left branch of evaluation) when stream produced by f fails. Scopes may only actually help to run the finalisers, addressing the leak of flatMap. Whether we say that such approach is required for performance reasons this is another question.
. i.e.

def p1: Stream[IO, Int] = 
Stream(1,0,2)
.flatMap(i => Stream.eval(IO { 1 /i }))
.onFinalize(IO { println("CLN")})
.handleErrorWith { _ => Stream.eval_(IO { println("FAIL")  }) ++ p1 }

// --- AND 

def p2: Stream[IO, Int] = 
Stream(1,0,2)
.onFinalize(IO { println("CLN")})
.flatMap(i => Stream.eval(IO { 1 /i }))
.handleErrorWith { _ => Stream.eval_(IO { println("FAIL")  }) ++ p2 }

are completely different programs and they SHOULD not be. the first one works as expected, while second one needs scope to be inserted.

@mpilquist
Copy link
Member

You can't talk about finalizers without talking about scopes -- scopes are how we implement finalizers and as a result, the two concepts are inextricably linked. Trying to special purpose flatMap to handle finalization or scope closure differently (i.e. without relying on scope unwinding) is my main objection to the interrupt PR.

As for the p1/p2 example, note that besides the root scope, the only scope introduction in both p1 and p2 is from Stream.eval_, and specifically, the call to .drain inside the implementation of eval_. eval_ calls drain which calls mapSegments which opens a pull and closes it via .stream which opens the scope.

It's not necessary for p1 and p2 to be the same program. You can argue that you expect p1 and p2 to be the same but if we really want them to be the same, then there should be a program transformation based on algebraic reasoning & stream laws that show their equivalence. You can't do that right now because we don't have substitution laws defined for Stream besides a few in the ScalaDoc.

With that said, if we modify handleErrorWith to insert a scope automatically, p1 and p2 behave the same.

@pchlupacek
Copy link
Contributor Author

pchlupacek commented Dec 20, 2017

@mpilquist agree with what you say, however please note that resources lifetime may NOT be exactly the same as the scope's lifetime. For example resource may be closed before scope is closed, as a result of Stream.bracket construct (that does not introduce any scopes), and that's exactly whats going NOT to happen in p2.

When we add scope to handleErrorWith you essentially create the scope that's solely function is to close any not yet finalised resources, that leaked due flatMap's implementation to ignore left branch of evaluation. This is why I call this a workaround, and I am fine with it, if it will prove to work with all cases necessary, as this is simplest solution.

As a side note, I just used eval_ here to make things compile, didn't want it to bring into the game altogether :-)

@mpilquist
Copy link
Member

Well I still object to using the term leak here as leaks are bugs that need to be fixed and this is rather an intentional design element, where scopes are used as a safeguard for abnormal sub-stream exit. :trollface:

When we add scope to handleErrorWith you essentially create the scope that's solely function is to close any not yet finalised resources

Yes, but this is the role of all scopes -- it's literally the only reason they exist.

, that leaked due flatMap's implementation to ignore left branch of evaluation

Note this behavior is exactly why onFinalize has stronger guarantees than onComplete. In fact, --onFinalize uses onComplete to implement timely release, falling back to registering a scope finalizer for these early exit cases. Further, I don't think flatMap is the only way to create this scenario.

@pchlupacek
Copy link
Contributor Author

@mpilquist ah, ok no problem with that terminology, I agree this is perhaps too strong. Lets agree to keep that design if possible. Do you have any other places where this may be a problem? I would like to test them for interruption too...

@pchlupacek
Copy link
Contributor Author

resolved #1019

@mpilquist mpilquist added this to the 0.10 milestone Jan 6, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants