-
Notifications
You must be signed in to change notification settings - Fork 607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Process
API
#3149
Introduce Process
API
#3149
Conversation
|
||
import cats.effect.kernel.Resource | ||
|
||
sealed trait ProcessSpawn[F[_]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FTR I don't think this is a particularly meaningful constraint. I think it's safe to assume that if there is the ProcessSpawn
constraint, rockets may very well be launched 😁
I almost ripped it out. But it does do something important: it allows us to abstract over alternate implementations. For example via NuProcess on the JVM or io_uring/libuv on Native. So that's actually pretty useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a good idea to make it typeclass though ? I though retrospectively, Network
being a typeclass was not so great (because there can be several implementations of it). Is there some consensus that it's okay to have a "canonical" implementation that is provided implicitly then ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent point. I'm not sure there is any consensus on this topic yet 😅
The current design choices mirror the rest of the library, but indeed that might not be what we want to do in the long-term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with that in mind, and if it was me, I'd change the ProcessBuilder
to have the following overrides :
def spawn[F[_]: Async] : Resource[F, Process[F]] = spawn(ProcessSpawn.forAsync[F])
def spawn[F[_]](processSpawn: ProcessSpawn[F]) : Resource[F, Process[F]]
def spawnStream[F[_]: Async] : Stream[F, Process[F]] = spawnStream(ProcessSpawn.forAsync[F])
def spawnStream[F[_]](processSpawn: ProcessSpawn[F]) : Stream[F, Process[F]]
This would reduce the ceremonies around starting subprocesses (which is important for scripting DX), whilst providing the flexibility to switch implementations (in particular for test purposes). As you mentioned, it's reasonably safe to assume that missiles will be launched, and therefore Async
is a safe constraint to have on the spawn
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have several implementations, it should probably not be sealed
? NuProcess and alike sound cool, but I would rather see them as standalone libs implementing this interface than blessed integrations living inside the fs2 repo (dependencies!). So more like http4s.Client
, just with a reasonable default implementation for free.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should probably not be
sealed
Well, it's an interesting question how to do this :) for example in fs2.io.uring
I've implemented a bunch of sealed
stuff and I think it works fine. It discourages casual extension, which theoretically leaves the door open for us to add new methods to the interface.
Of course, if we did add new methods, that would break external implementations. But those external dependencies should never be used in libraries anyway, just in applications.
sealed trait Process[F[_]] { | ||
|
||
def isAlive: F[Boolean] | ||
|
||
def exitValue: F[Int] | ||
|
||
def stdin: Pipe[F, Byte, Nothing] | ||
|
||
def stdout: Stream[F, Byte] | ||
|
||
def stderr: Stream[F, Byte] | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API definitely needs some thinking about. Particularly about semantics of stdin
, stdout
, and stderr
. Can you compile these streams at most once, or many times? Do they close the underlying streams when complete? Along these lines, do we want APIs to read/write a Chunk
directly?
This is all possible I believe but trickier to support. Easiest of course is compile-at-most-once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Easiest of course is compile-at-most-once.
This matches my personal intuition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also curious about how piping processes looks like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also curious about how piping processes looks like
Not a lot of fun right now. I wonder if we added a Monoid[Process]
for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Monoid[Process]
Interesting idea, though what'd happen to the stderr ? :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it would be similar to piping in the shell, only stdout. Also as suggested in #3149 (comment) we could add a feature to redirect stderr to stdout.
On second thought, less sure about Monoid
(what's the identity process 🤔) but Semigroup[Process]
seems plausible.
// this monstrosity ensures that we can cancel on-going reads from stdout/stderr without hanging | ||
// to do so, it must kill the process | ||
private[this] def readInputStreamCancelably(is: F[InputStream]) = | ||
readInputStreamGeneric(is, F.delay(new Array[Byte](8192)), true) { (is, buf) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is fun should this chunk size here be configurable? Where should that be exposed?
|
||
object ProcessBuilder { | ||
|
||
def apply(command: String, args: List[String]): ProcessBuilder = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity : why not def apply(command: String, args: String*)
? Is there some bin-compat related reason that make it bad ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason, added 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably use this api in scripts most often, and I'd like a more high-level api that abstracts some common patterns, e.g. passing optional arguments, or arguments k-v pairs, or options of those, etc.
Unfortunately the way you do that is with a bunch of implicit conversions 😬
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably use this api
I'm not sure if this is the API you want :) It's kind of like the JDK Process
stuff vs Scalalib process DSL. The goal here is to provide the API and do the cross-platform stuff.
Another lib makes sense for your high-level API. If it's targeting scripts it can break bincompat more freely than FS2 can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately the way you do that is with a bunch of implicit conversions
I think this would be a nice client-side addition to decline, or some higher-level library of the sort. To elaborate, Smithy4s provides an integration with decline OOTB : https://disneystreaming.github.io/smithy4s/docs/protocols/deriving-cli
With this PR being merged to fs2, I'd be inclined to add the client-side of things to go from high-level method calls to low level args that could be used in the ProcessBuilder, in a way that would match what is done in the decline integration.
Not trying to advertise, just saying that seems like a job for higher-level libraries. In particular because there is no actual "standard" for things like arguments key-value pairs, etc .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any additional lib is a pain for scripts
//> using toolkit "typelevel"
🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any additional lib is a pain for scripts, i.e. you have to remember/google id and version before you can hit the ground running.
@SystemFw can I interest you in a modest tool I've written for that very reason ? IIRC you're an emacs user so here's the CLI version : https://github.com/neandertech/dexsearch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//> using toolkit "typelevel"
Is this a thing?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be soon, hopefully :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new UnsealedProcess[F] { | ||
def isAlive = F.delay(process.isAlive()) | ||
def exitValue = F.interruptible(process.waitFor()) | ||
def stdin = writeOutputStream(F.delay(process.getOutputStream())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A problem I've faced several times with the current implementation of fs2.io.writeOutputStream
is the fact that chunks aren't flushed automatically: see https://github.com/neandertech/jsonrpclib/pull/58/files#r1114125452
I feel terrible for not having raised an issue yet 😞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed :)
Not content with it living in a library? https://github.com/davenverse/process |
@ChristopherDavenport my personal take is that subprocesses are a fundamental enough building block of applications that having it upstream alongside files / networking makes sense. |
Continuing from #3149 (comment).
Thanks for this. This is seems good—I'm almost convinced. However I worry that library authors will just use def spawn : Resource[IO, Process[IO]] = spawn(ProcessSpawn.forAsync[IO])
def spawn[F[_]](processSpawn: ProcessSpawn[F]) : Resource[F, Process[F]] By hard-coding to Also random bikeshed: rename |
private[io] def blockingCancelable[F[_], A]( | ||
cancel: F[Unit] | ||
)(thunk: => A)(implicit F: Async[F]): F[A] = | ||
(F.deferred[Unit], F.delay(new AtomicBoolean(false))).flatMapN { (gate, inProgress) => | ||
F.race( | ||
gate.get *> F.blocking { | ||
inProgress.set(true) | ||
try thunk | ||
finally inProgress.set(false) | ||
}, | ||
(gate.complete(()) *> F.never[A]).onCancel(F.delay(inProgress.get()).ifM(cancel, F.unit)) | ||
).map(_.merge) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks interesting, thanks for working on it @armanbilge! Maybe I can use this in Scala Steward instead of our homegrown code for reading the output of processes. One requirement we have is that processes never wait for input. We ensure this by closing stdin immediately after process creation. A possible test for this is running Another requirement we have is that processes must not inherit the environment variables of Scala Steward itself. This is done by calling It would be nice if |
@fthomas thanks for the helpful feedback!
In the current design you could write an empty
Ha, darn, was hoping I could sweep that under the rug 😂 ok thanks, I'll add support for that 😇 |
Co-authored-by: Frank Thomas <frank@timepit.eu>
def withInheritEnv(inherit: Boolean): ProcessBuilder | ||
def withExtraEnv(env: Map[String, String]): ProcessBuilder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bikeshed? :) Probably should add docs, they need more exposition now...
Yes, that works for me. Thank you for adding a test for that and the option to disable env inheritance! Btw, Java's |
Yes, I chased it down in OpenJDK and it seems to be using The annoying thing is that Node.js doesn't support this. As a workaround we can just do the stream merge ourselves. More generally, both the JDK and Node.js support various options for these streams. For example, inheriting stdin/stdout/stderr from the parent process, or redirecting to a file (or even a socket). So maybe this setting should be modeled with an ADT. I'd also need to check how those options are supposed to interact with Tangential, but FS2's |
@@ -26,7 +26,7 @@ package internal | |||
import cats.effect.kernel.Concurrent | |||
import cats.effect.kernel.Resource | |||
import cats.effect.std.Queue | |||
import cats.effect.std.Semaphore | |||
import cats.effect.std.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This switch from Semaphore
to Mutex
seems unrelated to main PR. Perhaps separate PR?
Continuing from #3149 (comment)
From my perspective, it does make sense. I think I'd call the monomorphic version |
Makes sense, especially since this is a bigger topic. Thanks for digging into it! |
implicit def forAsync[F[_]](implicit F: Async[F]): ProcessSpawn[F] = new UnsealedProcessSpawn[F] { | ||
def spawn(process: ProcessBuilder): Resource[F, Process[F]] = | ||
Resource | ||
.make { | ||
F.async_[facade.child_process.ChildProcess] { cb => | ||
val subprocess = facade.child_process.spawn( | ||
process.command, | ||
process.args.toJSArray, | ||
new facade.child_process.SpawnOptions { | ||
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString) | ||
env = | ||
if (process.inheritEnv) | ||
(facade.process.env ++ process.extraEnv).toJSDictionary | ||
else | ||
process.extraEnv.toJSDictionary | ||
} | ||
) | ||
|
||
subprocess.once("spawn", () => cb(Right(subprocess))) | ||
subprocess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e)))) | ||
} | ||
} { childProcess => | ||
F.asyncCheckAttempt[Unit] { cb => | ||
F.delay { | ||
if ((childProcess.exitCode ne null) || (childProcess.signalCode ne null)) { | ||
Either.unit | ||
} else { | ||
childProcess.kill() | ||
childProcess.once("exit", () => cb(Either.unit)) | ||
Left(None) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implicit def forAsync[F[_]](implicit F: Async[F]): ProcessSpawn[F] = new UnsealedProcessSpawn[F] { | |
def spawn(process: ProcessBuilder): Resource[F, Process[F]] = | |
Resource | |
.make { | |
F.async_[facade.child_process.ChildProcess] { cb => | |
val subprocess = facade.child_process.spawn( | |
process.command, | |
process.args.toJSArray, | |
new facade.child_process.SpawnOptions { | |
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString) | |
env = | |
if (process.inheritEnv) | |
(facade.process.env ++ process.extraEnv).toJSDictionary | |
else | |
process.extraEnv.toJSDictionary | |
} | |
) | |
subprocess.once("spawn", () => cb(Right(subprocess))) | |
subprocess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e)))) | |
} | |
} { childProcess => | |
F.asyncCheckAttempt[Unit] { cb => | |
F.delay { | |
if ((childProcess.exitCode ne null) || (childProcess.signalCode ne null)) { | |
Either.unit | |
} else { | |
childProcess.kill() | |
childProcess.once("exit", () => cb(Either.unit)) | |
Left(None) | |
} | |
} | |
} | |
} | |
implicit def forAsync[F[_]](implicit F: Async[F]): ProcessSpawn[F] = new UnsealedProcessSpawn[F] { | |
import facade.child_process.ChildProcess | |
private def spawnChild(process: ProcessBuilder): F[ChildProcess] = | |
F.async_[facade.child_process.ChildProcess] { cb => | |
val subprocess = facade.child_process.spawn( | |
process.command, | |
process.args.toJSArray, | |
new facade.child_process.SpawnOptions { | |
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString) | |
env = | |
if (process.inheritEnv) | |
(facade.process.env ++ process.extraEnv).toJSDictionary | |
else | |
process.extraEnv.toJSDictionary | |
} | |
) | |
subprocess.once("spawn", () => cb(Right(subprocess))) | |
subprocess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e)))) | |
} | |
private val cleanup: ChildProcess => F[Unit] = | |
(child: ChildProcess) => F.asyncCheckAttempt[Unit] { cb => | |
F.delay { | |
if ((child.exitCode ne null) || (child.signalCode ne null)) { | |
Either.unit | |
} else { | |
child.kill() | |
child.once("exit", () => cb(Either.unit)) | |
Left(None) | |
} | |
} | |
} | |
def spawn(process: ProcessBuilder): Resource[F, Process[F]] = | |
Resource.make(spawnChild(process))(cleanup) |
.make { | ||
F.blocking { | ||
val builder = new lang.ProcessBuilder((process.command :: process.args).asJava) | ||
|
||
process.workingDirectory.foreach { path => | ||
builder.directory(path.toNioPath.toFile) | ||
} | ||
|
||
val env = builder.environment() | ||
if (!process.inheritEnv) env.clear() | ||
process.extraEnv.foreach { case (k, v) => | ||
env.put(k, v) | ||
} | ||
|
||
builder.start() | ||
} | ||
} { process => | ||
F.delay(process.isAlive()) | ||
.ifM( | ||
F.blocking { | ||
process.destroy() | ||
process.waitFor() | ||
() | ||
}, | ||
F.unit | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you extract both initializer and finalizer as separate methods, and thus reduce this to Resource.make(initialise(process))(finaliser).map { process =>
?
new UnsealedProcess[F] { | ||
def isAlive = F.delay(process.isAlive()) | ||
|
||
def exitValue = isAlive.ifM( | ||
F.interruptible(process.waitFor()), | ||
F.delay(process.exitValue()) | ||
) | ||
|
||
def stdin = writeOutputStreamCancelable( | ||
F.delay(process.getOutputStream()), | ||
F.blocking(process.destroy()) | ||
) | ||
|
||
def stdout = readInputStreamCancelable( | ||
F.delay(process.getInputStream()), | ||
F.blocking(process.destroy()), | ||
8192 | ||
) | ||
|
||
def stderr = readInputStreamCancelable( | ||
F.delay(process.getErrorStream()), | ||
F.blocking(process.destroy()), | ||
8192 | ||
) | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extract this as a subclass of UnsealedProcess
, one which is private to this package? It is often easier to have named classes over anonymous ones.
Introduces an API for spawning arbitrary subprocesses and piping stdin/stdout/stderr, and implements it on JVM/Native (via
java.lang.Process
) and on Node.js (vianode:child_process
).fs2.io.process.ProcessBuilder
fs2.io.process.Process[F]
fs2.io.process.ProcessSpawn[F]
APIs still need some polish, bikeshed, docs, etc., please chime in! I will make a pass of inline comments.
Also published as
3.7-0ec6699-SNAPSHOT
.