Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New concurrency primitives #1006

Merged
merged 24 commits into from
Dec 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Also see the [GitHub contributor stats](https://github.com/functional-streams-fo
- Paul Chiusano ([@pchiusano](https://github.com/pchiusano)): project founder and maintainer
- Pavel Chlupáček ([@pchlupacek](https://github.com/pchlupacek)): concurrency, original implementation of `Append` algebra
- Gary Coady ([@fiadliel](https://github.com/fiadliel)): NIO file implementation, port of compress from scalaz-stream, other minor features
- Fabio Labella ([@SystemFw](https://github.com/SystemFw)): new concurrency scheme with `Set` + `Promise`, maintenance
- Alissa Pajer ([@alissapajer](https://github.com/alissapajer)): concurrency bug hunting, features, maintainance
- Daniel Spiewak ([@djspiewak](https://github.com/djspiewak)): concurrency plumbing, bug fixes, performance and correctness obsession
+ Public key (for release signing): [3587 7FB3 2BAE 5960](https://keybase.io/djspiewak)
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ val ReleaseTag = """^release/([\d\.]+a?)$""".r
lazy val contributors = Seq(
"pchiusano" -> "Paul Chiusano",
"pchlupacek" -> "Pavel Chlupáček",
"SystemFw" -> "Fabio Labella",
"alissapajer" -> "Alissa Pajer",
"djspiewak" -> "Daniel Spiewak",
"fthomas" -> "Frank Thomas",
Expand Down
15 changes: 8 additions & 7 deletions core/jvm/src/main/scala/fs2/StreamApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cats.effect._
import cats.effect.implicits._
import cats.implicits._

import fs2.async.Ref
import fs2.async.Promise
import fs2.async.mutable.Signal
import fs2.StreamApp.ExitCode

Expand Down Expand Up @@ -37,9 +37,10 @@ abstract class StreamApp[F[_]](implicit F: Effect[F]) {
/** Exposed for testing, so we can check exit values before the dramatic sys.exit */
private[fs2] def doMain(args: List[String]): IO[ExitCode] = {
implicit val ec: ExecutionContext = directEC
async.ref[IO, ExitCode].flatMap { exitCodeRef =>
async.promise[IO, ExitCode].flatMap { exitCodePromise
=>
async.signalOf[IO, Boolean](false).flatMap { halted =>
runStream(args, exitCodeRef, halted)
runStream(args, exitCodePromise, halted)
}}
}

Expand All @@ -52,7 +53,7 @@ abstract class StreamApp[F[_]](implicit F: Effect[F]) {
* @param ec Implicit EC to run the application stream
* @return An IO that will produce an ExitCode
*/
private[fs2] def runStream(args: List[String], exitCodeRef: Ref[IO,ExitCode], halted: Signal[IO,Boolean])
private[fs2] def runStream(args: List[String], exitCodePromise: Promise[IO,ExitCode], halted: Signal[IO,Boolean])
(implicit ec: ExecutionContext): IO[ExitCode] =
async.signalOf[F, Boolean](false).flatMap { requestShutdown =>
addShutdownHook(requestShutdown, halted) *>
Expand All @@ -61,11 +62,11 @@ abstract class StreamApp[F[_]](implicit F: Effect[F]) {
case Left(t) =>
IO(t.printStackTrace()) *>
halted.set(true) *>
exitCodeRef.setSyncPure(ExitCode.Error)
exitCodePromise.setSync(ExitCode.Error)
case Right(exitCode) =>
halted.set(true) *>
exitCodeRef.setSyncPure(exitCode.getOrElse(ExitCode.Success))
} *> exitCodeRef.get
exitCodePromise.setSync(exitCode.getOrElse(ExitCode.Success))
} *> exitCodePromise.get

def main(args: Array[String]): Unit =
sys.exit(doMain(args.toList).unsafeRunSync.code.toInt)
Expand Down
2 changes: 2 additions & 0 deletions core/jvm/src/test/scala/fs2/async/QueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class QueueSpec extends Fs2Spec {
)).flatten shouldBe Vector(false, 42, 42, 42)
}
"peek1 circular buffer" in {
pending // known race condition in test
runLog(Stream.eval(
for {
q <- async.circularBuffer[IO, Int](maxSize = 1)
Expand Down Expand Up @@ -168,6 +169,7 @@ class QueueSpec extends Fs2Spec {
)).flatten shouldBe Vector(42, 42, 42)
}
"timedPeek1 synchronous queue" in {
pending // known race condition in test
runLog(Scheduler[IO](1).flatMap { scheduler =>
Stream.eval(
for {
Expand Down
49 changes: 49 additions & 0 deletions core/jvm/src/test/scala/fs2/async/RefSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package fs2
package async

import cats.effect.IO
import cats.implicits._

class RefSpec extends Fs2Spec with EventuallySupport {

"Ref" - {

"concurrent modifications" in {
val finalValue = 10
// Cannot use streams, parallelSequence or Promise since they are implemented with Ref
val r = refOf[IO, Int](0).unsafeRunSync

List.fill(finalValue) {
fork(r.modify(_ + 1))
}.sequence.unsafeRunSync

eventually { r.get.unsafeRunSync shouldBe finalValue }
}

"successful access" in {
val op = for {
r <- refOf[IO, Int](0)
valueAndSetter <- r.access
(value, setter) = valueAndSetter
success <- setter(value + 1)
result <- r.get
} yield success && result == 1

op.unsafeRunSync shouldBe true
}

"failed access" in {
val op = for {
r <- refOf[IO, Int](0)
valueAndSetter <- r.access
(value, setter) = valueAndSetter
_ <- r.setSync(5)
success <- setter(value + 1)
result <- r.get
} yield !success && result == 5

op.unsafeRunSync shouldBe true
}

}
}
2 changes: 1 addition & 1 deletion core/jvm/src/test/scala/fs2/async/TopicSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class TopicSpec extends Fs2Spec {


"synchronous publish" in {

pending // TODO I think there's a race condition on the signal in this test
val topic = async.topic[IO,Int](-1).unsafeRunSync()
val signal = async.signalOf[IO,Int](0).unsafeRunSync()
val count = 100
Expand Down
24 changes: 12 additions & 12 deletions core/shared/src/main/scala/fs2/AsyncPull.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ sealed abstract class AsyncPull[F[_],A] { self =>
def race[B](b: AsyncPull[F,B])(implicit F: Effect[F], ec: ExecutionContext): AsyncPull[F,Either[A,B]] = new AsyncPull[F, Either[A,B]] {
def get = cancellableGet.flatMap(_._1)
def cancellableGet = FreeC.Eval(for {
ref <- async.ref[F,Either[Throwable, Either[A,B]]]
promise <- async.promise[F,Either[Throwable, Either[A,B]]]
t0 <- self.cancellableGet.run
(a, cancelA) = t0
t1 <- b.cancellableGet.run
(b, cancelB) = t1
fa = a.run.map(Left(_): Either[A, B])
fb = b.run.map(Right(_): Either[A, B])
_ <- async.fork(fa.attempt.flatMap(ref.setAsyncPure))
_ <- async.fork(fb.attempt.flatMap(ref.setAsyncPure))
_ <- async.fork(fa.attempt.flatMap(x => promise.setSync(x)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we get rid of x?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I left it there because I already have code to make this function simpler, exploiting the fact that set will fail on second set (and that needs the x)

_ <- async.fork(fb.attempt.flatMap(x => promise.setSync(x)))
} yield {
(FreeC.Eval(ref.get.flatMap {
(FreeC.Eval(promise.get.flatMap {
case Left(e) => F.raiseError[Either[A, B]](e)
case Right(Left(a)) => cancelB.run.as(Left(a): Either[A, B])
case Right(Right(b)) => cancelA.run.as(Right(b): Either[A, B])
Expand Down Expand Up @@ -83,21 +83,21 @@ object AsyncPull {
def cancellableGet = FreeC.Pure((get, FreeC.Pure(())))
}

/** Returns an async pull that gets its value from reading the specified ref. */
def readRef[F[_],A](r: async.Ref[F,A]): AsyncPull[F,A] =
/** Returns an async pull that gets its value from reading the specified promise. */
def readPromise[F[_],A](p: async.Promise[F,A]): AsyncPull[F,A] =
new AsyncPull[F,A] {
def get = FreeC.Eval(r.get)
def cancellableGet = FreeC.Eval(r.cancellableGet).map { case (get, cancel) => (FreeC.Eval(get), FreeC.Eval(cancel)) }
def get = FreeC.Eval(p.get)
def cancellableGet = FreeC.Eval(p.cancellableGet).map { case (get, cancel) => (FreeC.Eval(get), FreeC.Eval(cancel)) }
}

/**
* Like [[readRef]] but reads a `Ref[F,Either[Throwable,A]]` instead of a `Ref[F,A]`. If a `Left(t)` is read,
* Like [[readPromise]] but reads a `Promise[F,Either[Throwable,A]]` instead of a `Promise[F,A]`. If a `Left(t)` is read,
* the `get` action fails with `t`.
*/
def readAttemptRef[F[_],A](r: async.Ref[F,Either[Throwable,A]]): AsyncPull[F,A] =
def readAttemptPromise[F[_],A](p: async.Promise[F,Either[Throwable,A]]): AsyncPull[F,A] =
new AsyncPull[F,A] {
def get = FreeC.Eval(r.get).flatMap(_.fold(FreeC.Fail(_), FreeC.Pure(_)))
def cancellableGet = FreeC.Eval(r.cancellableGet).map { case (get, cancel) =>
def get = FreeC.Eval(p.get).flatMap(_.fold(FreeC.Fail(_), FreeC.Pure(_)))
def cancellableGet = FreeC.Eval(p.cancellableGet).map { case (get, cancel) =>
(FreeC.Eval(get).flatMap(_.fold(FreeC.Fail(_), FreeC.Pure(_))), FreeC.Eval(cancel))
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/shared/src/main/scala/fs2/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,12 @@ object Scheduler extends SchedulerPlatform {
* If the cancellation action is invoked, the gate completes with `None`. Otherwise, the gate completes with `Some(a)`.
*/
def delayCancellable[F[_],A](fa: F[A], d: FiniteDuration)(implicit F: Effect[F], ec: ExecutionContext): F[(F[Option[A]],F[Unit])] =
async.ref[F,Option[A]].flatMap { gate =>
async.promise[F,Option[A]].flatMap { gate =>
F.delay {
val cancel = scheduler.scheduleOnce(d) {
ec.execute(() => async.unsafeRunAsync(fa.flatMap(a => gate.setAsyncPure(Some(a))))(_ => IO.unit))
ec.execute(() => async.unsafeRunAsync(fa.flatMap(a => gate.setSync(Some(a))))(_ => IO.unit))
}
gate.get -> (F.delay(cancel()) *> gate.setAsyncPure(None))
gate.get -> (F.delay(cancel()) *> gate.setSync(None))
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2501,7 +2501,7 @@ object Stream {
type UO = Option[(Segment[O,Unit], FreeC[Algebra[F,O,?],Unit])]

Pull.fromFreeC {
val ref = new async.Ref[F, Either[Throwable, Option[(Segment[O,Unit], Stream[F,O])]]]
val p = async.Promise.unsafeCreate[F, Either[Throwable, Option[(Segment[O,Unit], Stream[F,O])]]]
Algebra.getScope[F, Nothing] flatMap { scope =>
val runStep =
Algebra.runFoldScope(
Expand All @@ -2510,7 +2510,7 @@ object Stream {
, None : UO
){ (_, uo) => uo.asInstanceOf[UO] } map { _ map { case (hd, tl) => (hd, fromFreeC(tl)) }}

Algebra.eval(async.fork(F.flatMap(F.attempt(runStep))(ref.setAsyncPure))) map { _ => AsyncPull.readAttemptRef(ref) }
Algebra.eval(async.fork(F.flatMap(F.attempt(runStep))(x => async.fork(p.setSync(x))))) map { _ => AsyncPull.readAttemptPromise(p) }
}
}
}
Expand Down
23 changes: 0 additions & 23 deletions core/shared/src/main/scala/fs2/async/Change.scala

This file was deleted.

Loading