Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove SuspendedStream, MicrotaskExecutor hacks in Node.js Process #3366

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 0 additions & 41 deletions io/js/src/main/scala/fs2/io/internal/MicrotaskExecutor.scala

This file was deleted.

77 changes: 39 additions & 38 deletions io/js/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,36 @@ private[fs2] trait ioplatform {
def suspendReadableAndRead[F[_], R <: Readable](
destroyIfNotEnded: Boolean = true,
destroyIfCanceled: Boolean = true
)(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] = {
)(thunk: => R)(implicit F: Async[F]): Resource[F, (R, Stream[F, Byte])] =
Resource
.makeCase {
F.delay {
val readable = thunk
(readable, unsafeReadReadable(readable))
}
} {
case ((readable, _), Resource.ExitCase.Succeeded) =>
F.delay {
if (!readable.readableEnded & destroyIfNotEnded)
readable.destroy()
}
case ((readable, _), Resource.ExitCase.Errored(_)) =>
// tempting, but don't propagate the error!
// that would trigger a unhandled Node.js error that circumvents FS2/CE error channels
F.delay(readable.destroy())
case ((readable, _), Resource.ExitCase.Canceled) =>
if (destroyIfCanceled)
F.delay(readable.destroy())
else
F.unit
}
.adaptError { case IOException(ex) => ex }

/** Unsafely creates a `Stream` that reads all bytes from a `Readable`.
*/
private[io] def unsafeReadReadable[F[_]](
readable: Readable
)(implicit F: Async[F]): Stream[F, Byte] = {

final class Listener {
private[this] var readable = false
Expand Down Expand Up @@ -118,45 +147,17 @@ private[fs2] trait ioplatform {

go.streamNoScope
}

}

Resource
.eval(F.delay(new Listener))
.flatMap { listener =>
Resource
.makeCase {
F.delay {
val readable = thunk
readable.on("readable", () => listener.handleReadable())
readable.once("error", listener.handleError(_))
readable.once("end", () => listener.handleEnd())
readable
}
} {
case (readable, Resource.ExitCase.Succeeded) =>
F.delay {
if (!readable.readableEnded & destroyIfNotEnded)
readable.destroy()
}
case (readable, Resource.ExitCase.Errored(_)) =>
// tempting, but don't propagate the error!
// that would trigger a unhandled Node.js error that circumvents FS2/CE error channels
F.delay(readable.destroy())
case (readable, Resource.ExitCase.Canceled) =>
if (destroyIfCanceled)
F.delay(readable.destroy())
else
F.unit
}
.fproduct { readable =>
listener.readableEvents.adaptError { case IOException(ex) => ex } >>
Stream.evalUnChunk(
F.delay(Option(readable.read()).fold(Chunk.empty[Byte])(Chunk.uint8Array(_)))
)
}
}
.adaptError { case IOException(ex) => ex }
val listener = new Listener
readable.on("readable", () => listener.handleReadable())
readable.once("error", listener.handleError(_))
readable.once("end", () => listener.handleEnd())

listener.readableEvents.adaptError { case IOException(ex) => ex } >>
Stream.evalUnChunk(
F.delay(Option(readable.read()).fold(Chunk.empty[Byte])(Chunk.uint8Array(_)))
)
}

/** `Pipe` that converts a stream of bytes to a stream that will emit a single `Readable`,
Expand Down
105 changes: 44 additions & 61 deletions io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ package process
import cats.effect.kernel.Resource
import cats.effect.kernel.Async
import cats.syntax.all._
import fs2.io.internal.MicrotaskExecutor
import fs2.io.internal.SuspendedStream
import fs2.io.internal.facade

import scala.scalajs.js
Expand All @@ -36,27 +34,50 @@ import scala.scalajs.js.JSConverters._
private[process] trait ProcessesCompanionPlatform {
def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] {
def spawn(process: ProcessBuilder): Resource[F, Process[F]] =
Resource
.make {
F.async_[facade.child_process.ChildProcess] { cb =>
val subprocess = facade.child_process.spawn(
process.command,
process.args.toJSArray,
new facade.child_process.SpawnOptions {
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString)
env =
if (process.inheritEnv)
(facade.process.env ++ process.extraEnv).toJSDictionary
else
process.extraEnv.toJSDictionary
Resource {
F.async_[(Process[F], F[Unit])] { cb =>
val childProcess = facade.child_process.spawn(
process.command,
process.args.toJSArray,
new facade.child_process.SpawnOptions {
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString)
env =
if (process.inheritEnv)
(facade.process.env ++ process.extraEnv).toJSDictionary
else
process.extraEnv.toJSDictionary
}
)

val fs2Process = new UnsealedProcess[F] {

def isAlive: F[Boolean] = F.delay {
(childProcess.exitCode eq null) && (childProcess.signalCode eq null)
}

def exitValue: F[Int] = F.asyncCheckAttempt[Int] { cb =>
F.delay {
(childProcess.exitCode: Any) match {
case i: Int => Right(i)
case _ =>
val f: js.Function1[Any, Unit] = {
case i: Int => cb(Right(i))
case _ => // do nothing
}
childProcess.once("exit", f)
Left(Some(F.delay(childProcess.removeListener("exit", f))))
}
}
)
}

def stdin = writeWritable(F.delay(childProcess.stdin))

def stdout = unsafeReadReadable(childProcess.stdout)

subprocess.once("spawn", () => cb(Right(subprocess)))
subprocess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e))))
def stderr = unsafeReadReadable(childProcess.stderr)
}
} { childProcess =>
F.asyncCheckAttempt[Unit] { cb =>

val finalize = F.asyncCheckAttempt[Unit] { cb =>
F.delay {
if ((childProcess.exitCode ne null) || (childProcess.signalCode ne null)) {
Either.unit
Expand All @@ -67,48 +88,10 @@ private[process] trait ProcessesCompanionPlatform {
}
}
}
}
.flatMap { childProcess =>
def suspend(readable: => fs2.io.Readable) =
SuspendedStream(
Stream.resource(fs2.io.suspendReadableAndRead()(readable)).flatMap(_._2)
)

(suspend(childProcess.stdout), suspend(childProcess.stderr)).mapN {
(suspendedOut, suspendedErr) =>
new UnsealedProcess[F] {

def isAlive: F[Boolean] = F.delay {
(childProcess.exitCode eq null) && (childProcess.signalCode eq null)
}

def exitValue: F[Int] = F.asyncCheckAttempt[Int] { cb =>
F.delay {
(childProcess.exitCode: Any) match {
case i: Int => Right(i)
case _ =>
val f: js.Function1[Any, Unit] = {
case i: Int => cb(Right(i))
case _ => // do nothing
}
childProcess.once("exit", f)
Left(Some(F.delay(childProcess.removeListener("exit", f))))
}
}
}

def stdin: Pipe[F, Byte, Nothing] =
fs2.io.writeWritable(F.delay(childProcess.stdin))

def stdout: Stream[F, Byte] = suspendedOut.stream

def stderr: Stream[F, Byte] = suspendedErr.stream

}

}
childProcess.once("spawn", () => cb(Right(fs2Process -> finalize)))
childProcess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e))))
}
.evalOn(MicrotaskExecutor) // guarantee that callbacks are setup before yielding to I/O

}
}
}