Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InterruptGuard: remove value parameter and type parameter. #2480

Merged
merged 1 commit into from
Jul 12, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 38 additions & 46 deletions core/shared/src/main/scala/fs2/Pull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -861,17 +861,11 @@ object Pull extends PullLowPriority {
scope: Scope[F],
extendedTopLevelScope: Option[Scope[F]],
translation: G ~> F,
endRunner: Run[G, X, F[End]],
runner: Run[G, X, F[End]],
stream: Pull[G, X, Unit]
): F[End] = {

def interruptGuard[Mid](
scope: Scope[F],
view: Cont[Nothing, G, X],
runner: Run[G, X, F[Mid]]
)(
next: => F[Mid]
): F[Mid] =
def interruptGuard(scope: Scope[F], view: Cont[Nothing, G, X])(next: => F[End]): F[End] =
scope.isInterrupted.flatMap {
case None => next
case Some(outcome) =>
Expand Down Expand Up @@ -912,13 +906,13 @@ object Pull extends PullLowPriority {
}

def goErr(err: Throwable, view: Cont[Nothing, G, X]): F[End] =
go(scope, extendedTopLevelScope, translation, endRunner, view(Fail(err)))
go(scope, extendedTopLevelScope, translation, runner, view(Fail(err)))

class ViewRunner(val view: Cont[Unit, G, X]) extends Run[G, X, F[End]] {
private val prevRunner = endRunner
private val prevRunner = runner

def done(doneScope: Scope[F]): F[End] =
go(doneScope, extendedTopLevelScope, translation, endRunner, view(unit))
go(doneScope, extendedTopLevelScope, translation, prevRunner, view(unit))

def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[End] = {
@tailrec
Expand All @@ -933,7 +927,7 @@ object Pull extends PullLowPriority {

def interrupted(tok: Unique.Token, err: Option[Throwable]): F[End] = {
val next = view(Interrupted(tok, err))
go(scope, extendedTopLevelScope, translation, endRunner, next)
go(scope, extendedTopLevelScope, translation, prevRunner, next)
}

def fail(e: Throwable): F[End] = goErr(e, view)
Expand All @@ -946,13 +940,13 @@ object Pull extends PullLowPriority {

abstract class StepRunR[Y, S](view: Cont[Option[S], G, X]) extends Run[G, Y, F[End]] {
def done(scope: Scope[F]): F[End] =
interruptGuard(scope, view, endRunner) {
go(scope, extendedTopLevelScope, translation, endRunner, view(Succeeded(None)))
interruptGuard(scope, view) {
go(scope, extendedTopLevelScope, translation, runner, view(Succeeded(None)))
}

def interrupted(scopeId: Unique.Token, err: Option[Throwable]): F[End] = {
val next = view(Interrupted(scopeId, err))
go(scope, extendedTopLevelScope, translation, endRunner, next)
go(scope, extendedTopLevelScope, translation, runner, next)
}

def fail(e: Throwable): F[End] = goErr(e, view)
Expand All @@ -963,9 +957,9 @@ object Pull extends PullLowPriority {

def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] =
// For a Uncons, we continue in same Scope at which we ended compilation of inner stream
interruptGuard(outScope, view, endRunner) {
interruptGuard(outScope, view) {
val result = Succeeded(Some((head, tail)))
go(outScope, extendedTopLevelScope, translation, endRunner, view(result))
go(outScope, extendedTopLevelScope, translation, runner, view(result))
}
}

Expand All @@ -975,9 +969,9 @@ object Pull extends PullLowPriority {
def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] =
// StepLeg: we shift back to the scope at which we were
// before we started to interpret the Leg's inner stream.
interruptGuard(scope, view, endRunner) {
interruptGuard(scope, view) {
val result = Succeeded(Some(new Stream.StepLeg(head, outScope.id, tail)))
go(scope, extendedTopLevelScope, translation, endRunner, view(result))
go(scope, extendedTopLevelScope, translation, runner, view(result))
}
}

Expand Down Expand Up @@ -1014,18 +1008,18 @@ object Pull extends PullLowPriority {
}

def done(scope: Scope[F]): F[End] =
interruptGuard(scope, outView, endRunner) {
go(scope, extendedTopLevelScope, translation, endRunner, outView(unit))
interruptGuard(scope, outView) {
go(scope, extendedTopLevelScope, translation, runner, outView(unit))
}

def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[End] = {
val next = bindView(unconsed(head, tail), outView)
go(outScope, extendedTopLevelScope, translation, endRunner, next)
go(outScope, extendedTopLevelScope, translation, runner, next)
}

def interrupted(scopeId: Unique.Token, err: Option[Throwable]): F[End] = {
val next = outView(Interrupted(scopeId, err))
go(scope, extendedTopLevelScope, translation, endRunner, next)
go(scope, extendedTopLevelScope, translation, runner, next)
}

def fail(e: Throwable): F[End] = goErr(e, outView)
Expand All @@ -1039,7 +1033,7 @@ object Pull extends PullLowPriority {
case Left(Outcome.Canceled()) => Interrupted(scope.id, None)
case Left(Outcome.Succeeded(token)) => Interrupted(token, None)
}
go(scope, extendedTopLevelScope, translation, endRunner, view(result))
go(scope, extendedTopLevelScope, translation, runner, view(result))
}

def goAcquire[R](acquire: Acquire[G, R], view: Cont[R, G, X]): F[End] = {
Expand All @@ -1049,17 +1043,16 @@ object Pull extends PullLowPriority {
else translation(acquire.resource),
(resource, exit) => translation(acquire.release(resource, exit))
)

val cont = onScope.flatMap { outcome =>
val result = outcome match {
case Outcome.Succeeded(Right(r)) => Succeeded(r)
case Outcome.Succeeded(Left(scopeId)) => Interrupted(scopeId, None)
case Outcome.Canceled() => Interrupted(scope.id, None)
case Outcome.Errored(err) => Fail(err)
}
go(scope, extendedTopLevelScope, translation, endRunner, view(result))
go(scope, extendedTopLevelScope, translation, runner, view(result))
}
interruptGuard(scope, view, endRunner)(cont)
interruptGuard(scope, view)(cont)
}

def goInterruptWhen(
Expand All @@ -1077,9 +1070,9 @@ object Pull extends PullLowPriority {
case Outcome.Canceled() => Interrupted(scope.id, None)
case Outcome.Errored(err) => Fail(err)
}
go(scope, extendedTopLevelScope, translation, endRunner, view(result))
go(scope, extendedTopLevelScope, translation, runner, view(result))
}
interruptGuard(scope, view, endRunner)(cont)
interruptGuard(scope, view)(cont)
}

def goInScope(
Expand All @@ -1102,16 +1095,15 @@ object Pull extends PullLowPriority {
else
F.pure(extendedTopLevelScope)

val vrun = new ViewRunner(view)
val tail = maybeCloseExtendedScope.flatMap { newExtendedScope =>
scope.open(useInterruption).rethrow.flatMap { childScope =>
val bb = new Bind[G, X, Unit, Unit](stream) {
def cont(r: Terminal[Unit]): Pull[G, X, Unit] = endScope(childScope.id, r)
}
go(childScope, newExtendedScope, translation, vrun, bb)
go(childScope, newExtendedScope, translation, new ViewRunner(view), bb)
}
}
interruptGuard(scope, view, vrun)(tail)
interruptGuard(scope, view)(tail)
}

def goCloseScope(close: CloseScope, view: Cont[Unit, G, X]): F[End] = {
Expand Down Expand Up @@ -1147,41 +1139,41 @@ object Pull extends PullLowPriority {
scope.findInLineage(close.scopeId).flatMap {
case Some(toClose) if toClose.isRoot =>
// Impossible - don't close root scope as a result of a `CloseScope` call
go(scope, extendedTopLevelScope, translation, endRunner, viewCont(unit))
go(scope, extendedTopLevelScope, translation, runner, viewCont(unit))

case Some(toClose) if extendLastTopLevelScope && toClose.level == 1 =>
// Request to close the current top-level scope - if we're supposed to extend
// it instead, leave the scope open and pass it to the continuation
extendedTopLevelScope.traverse_(_.close(ExitCase.Succeeded).rethrow) *>
toClose.openAncestor.flatMap { ancestor =>
go(ancestor, Some(toClose), translation, endRunner, viewCont(unit))
go(ancestor, Some(toClose), translation, runner, viewCont(unit))
}

case Some(toClose) =>
toClose.close(close.exitCase).flatMap { r =>
toClose.openAncestor.flatMap { ancestor =>
val res = closeTerminal(r, ancestor)
go(ancestor, extendedTopLevelScope, translation, endRunner, viewCont(res))
go(ancestor, extendedTopLevelScope, translation, runner, viewCont(res))
}
}

case None =>
// scope already closed, continue with current scope
val result = close.interruption.getOrElse(unit)
go(scope, extendedTopLevelScope, translation, endRunner, viewCont(result))
go(scope, extendedTopLevelScope, translation, runner, viewCont(result))
}
}

viewL(stream) match {
case _: Succeeded[_] => endRunner.done(scope)
case failed: Fail => endRunner.fail(failed.error)
case int: Interrupted => endRunner.interrupted(int.context, int.deferredError)
case _: Succeeded[_] => runner.done(scope)
case failed: Fail => runner.fail(failed.error)
case int: Interrupted => runner.interrupted(int.context, int.deferredError)

case view: View[G, X, y] =>
view.step match {
case output: Output[_] =>
interruptGuard(scope, view, endRunner)(
endRunner.out(output.values, scope, view(unit))
interruptGuard(scope, view)(
runner.out(output.values, scope, view(unit))
)

case fmout: FlatMapOutput[g, z, _] => // y = Unit
Expand All @@ -1191,13 +1183,13 @@ object Pull extends PullLowPriority {
val composed: h ~> F = translation.asInstanceOf[g ~> F].compose[h](tst.fk)

val translateRunner: Run[h, X, F[End]] = new Run[h, X, F[End]] {
def done(scope: Scope[F]): F[End] = endRunner.done(scope)
def done(scope: Scope[F]): F[End] = runner.done(scope)
def out(head: Chunk[X], scope: Scope[F], tail: Pull[h, X, Unit]): F[End] =
endRunner.out(head, scope, Translate(tail, tst.fk))
runner.out(head, scope, Translate(tail, tst.fk))
def interrupted(scopeId: Unique.Token, err: Option[Throwable]): F[End] =
endRunner.interrupted(scopeId, err)
runner.interrupted(scopeId, err)
def fail(e: Throwable): F[End] =
endRunner.fail(e)
runner.fail(e)
}
go[h, X, End](scope, extendedTopLevelScope, composed, translateRunner, tst.stream)

Expand All @@ -1216,7 +1208,7 @@ object Pull extends PullLowPriority {

case _: GetScope[_] =>
val result = Succeeded(scope.asInstanceOf[y])
go(scope, extendedTopLevelScope, translation, endRunner, view(result))
go(scope, extendedTopLevelScope, translation, runner, view(result))

case mout: MapOutput[g, z, _] => goMapOutput[z](mout, view)
case eval: Eval[G, r] => goEval[r](eval, view)
Expand Down