Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nonemiting Pure Stream cannot be interrupted (stay to run in background) #1010

Closed
pchlupacek opened this issue Dec 3, 2017 · 14 comments
Closed
Milestone

Comments

@pchlupacek
Copy link
Contributor

pchlupacek commented Dec 3, 2017

    "interrupt (4)" in  {
      val s1 = Stream.constant(false).dropWhile(! _).take(1)
      val interrupt = mkScheduler.flatMap { _.sleep[IO](20.millis) map( _ => true) }
      // tests that interruption is successful even for infinite streams that never emits
      s1.covary[IO].interruptWhen(interrupt).run.unsafeRunTimed(1.second) shouldBe Some(())
      Thread.sleep(20000)
    }

The stream terminates, however s1 still runs in background unitl JVM finishes.
The reason is that s1 is pure Segment than never emits value.

@mpilquist any idea how to shortcut from segment in this case? I just need to exit the segment not only when the values are ready but also each n steps, and seems the maxSteps in uncons are not enough.

@pchlupacek
Copy link
Contributor Author

Btw, I suspect that any drained streams are subject of this issue, and perhaps you recall my poor experince with drained streams recently, I believe this is the root cause of it.

@mpilquist
Copy link
Member

You could check an interrupt flag of some form in the case where the splitAt in Algebra.uncons returns a Right.

@pchlupacek
Copy link
Contributor Author

Yeah was my thought initially, however there is no penalty-free solution to insert interrupt flag to uncons, the only place I can think of is from scope, and that have drastic performance impact.
Essentially we need here after certain steps always to exit uncons does not matter if the elements were / were not emitted.

@pchlupacek
Copy link
Contributor Author

@mpilquist also please note in this particular case the split seems to never terminate, just again and again runs the constant segment w/o emitting single value.

@mpilquist
Copy link
Member

Yeah, Stream#dropWhile is implemented in terms of Segment#dropWhile, which is strict -- it evaluates until the predicate fails. That explains why streams with dropWhile are uninterruptible.

@pchlupacek
Copy link
Contributor Author

pchlupacek commented Dec 5, 2017

@mpilquist I have investigated it further (as the last work on Segment didn't help here) and seems this is what is going on:

  1. Algebra.uncons
  2. viewL
  3. ToPull.dropWhile
  4. Algebra.uncons
    .... etc

Essentially no chance to exit this loop

@pchlupacek
Copy link
Contributor Author

ok @mpilquist this seems to do trick. Not sure, tho if this is most generic way, as this could hit any combinator now ?

   private def dropWhile_(p: O => Boolean, dropFailure: Boolean): Pull[F,Nothing,Option[Stream[F,O]]] =
      uncons.map {
        case None => None
        case Some((hd, tl)) =>
          hd.force.dropWhile(p, dropFailure) match {
            case Left(_) => println("DROP_ L"); Some(tl.pull.dropWhile_(p, dropFailure).streamNoScope)
            case Right(tl2) => println("DROP_ R"); Some(tl.cons(tl2))
          }
      }

@pchlupacek
Copy link
Contributor Author

nvm, obviously that doesn't seem to help. Any ideas how to solve this ?

@mpilquist
Copy link
Member

@pchlupacek We could add a checkInterrupt yield on each loop through the recursive function perhaps. Not sure if that's a good idea or not but worth trying if you haven't already. Something like checkInterrupt.flatMap(interrupted => if (interrupted) Pull.raiseError(Interrupted) else uncons.map { ... })

@pchlupacek
Copy link
Contributor Author

Ok, that would sort of require to bring Scope in play, which I was so hardly trying to avoid. Scope may also bring an non-negligible overhead (at least with my experiments with append and flatMap) . I was sort of thinking in lines if we cannot quit pull once a while to give chance for any interruption ?

@pchlupacek
Copy link
Contributor Author

Or lets say I don't know how to build Pull.checkInterrupt w/o consulting Scope for example.

@mpilquist
Copy link
Member

Without knowing more of the technique you are using for interruption, I can't say. I was thinking that adding a new primitive would yield control to the runFoldScope loop, where you'd know whether the stream should be interrupted.

@pchlupacek
Copy link
Contributor Author

I'll push that to branch(topic/interrupt). @mpilquist it is very rough, but look on Algebra.interrupt. The problem is that any way to lift to runFoldLoop has a great performance penalty. This is in fact behind the Scope issue.

@pchlupacek
Copy link
Contributor Author

resolved by #1019

@mpilquist mpilquist added this to the 0.10 milestone Jan 6, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants