Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Process API #3149

Merged
merged 35 commits into from
Mar 21, 2023
Merged

Introduce Process API #3149

merged 35 commits into from
Mar 21, 2023

Conversation

armanbilge
Copy link
Member

Introduces an API for spawning arbitrary subprocesses and piping stdin/stdout/stderr, and implements it on JVM/Native (via java.lang.Process) and on Node.js (via node:child_process).

  • fs2.io.process.ProcessBuilder
  • fs2.io.process.Process[F]
  • fs2.io.process.ProcessSpawn[F]

APIs still need some polish, bikeshed, docs, etc., please chime in! I will make a pass of inline comments.

Also published as 3.7-0ec6699-SNAPSHOT.


import cats.effect.kernel.Resource

sealed trait ProcessSpawn[F[_]] {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FTR I don't think this is a particularly meaningful constraint. I think it's safe to assume that if there is the ProcessSpawn constraint, rockets may very well be launched 😁

I almost ripped it out. But it does do something important: it allows us to abstract over alternate implementations. For example via NuProcess on the JVM or io_uring/libuv on Native. So that's actually pretty useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a good idea to make it typeclass though ? I though retrospectively, Network being a typeclass was not so great (because there can be several implementations of it). Is there some consensus that it's okay to have a "canonical" implementation that is provided implicitly then ?

Copy link
Member Author

@armanbilge armanbilge Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent point. I'm not sure there is any consensus on this topic yet 😅

The current design choices mirror the rest of the library, but indeed that might not be what we want to do in the long-term.

Copy link
Contributor

@Baccata Baccata Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with that in mind, and if it was me, I'd change the ProcessBuilder to have the following overrides :

def spawn[F[_]: Async] : Resource[F, Process[F]]  = spawn(ProcessSpawn.forAsync[F])
def spawn[F[_]](processSpawn: ProcessSpawn[F]) : Resource[F, Process[F]] 
def spawnStream[F[_]: Async] : Stream[F, Process[F]]  = spawnStream(ProcessSpawn.forAsync[F])
def spawnStream[F[_]](processSpawn: ProcessSpawn[F]) : Stream[F, Process[F]]

This would reduce the ceremonies around starting subprocesses (which is important for scripting DX), whilst providing the flexibility to switch implementations (in particular for test purposes). As you mentioned, it's reasonably safe to assume that missiles will be launched, and therefore Async is a safe constraint to have on the spawn method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have several implementations, it should probably not be sealed? NuProcess and alike sound cool, but I would rather see them as standalone libs implementing this interface than blessed integrations living inside the fs2 repo (dependencies!). So more like http4s.Client, just with a reasonable default implementation for free.

Copy link
Member Author

@armanbilge armanbilge Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should probably not be sealed

Well, it's an interesting question how to do this :) for example in fs2.io.uring I've implemented a bunch of sealed stuff and I think it works fine. It discourages casual extension, which theoretically leaves the door open for us to add new methods to the interface.

Of course, if we did add new methods, that would break external implementations. But those external dependencies should never be used in libraries anyway, just in applications.

Comment on lines 26 to 38
sealed trait Process[F[_]] {

def isAlive: F[Boolean]

def exitValue: F[Int]

def stdin: Pipe[F, Byte, Nothing]

def stdout: Stream[F, Byte]

def stderr: Stream[F, Byte]

}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API definitely needs some thinking about. Particularly about semantics of stdin, stdout, and stderr. Can you compile these streams at most once, or many times? Do they close the underlying streams when complete? Along these lines, do we want APIs to read/write a Chunk directly?

This is all possible I believe but trickier to support. Easiest of course is compile-at-most-once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easiest of course is compile-at-most-once.

This matches my personal intuition

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious about how piping processes looks like

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious about how piping processes looks like

Not a lot of fun right now. I wonder if we added a Monoid[Process] for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Monoid[Process]

Interesting idea, though what'd happen to the stderr ? :/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it would be similar to piping in the shell, only stdout. Also as suggested in #3149 (comment) we could add a feature to redirect stderr to stdout.

On second thought, less sure about Monoid (what's the identity process 🤔) but Semigroup[Process] seems plausible.

Comment on lines 76 to 79
// this monstrosity ensures that we can cancel on-going reads from stdout/stderr without hanging
// to do so, it must kill the process
private[this] def readInputStreamCancelably(is: F[InputStream]) =
readInputStreamGeneric(is, F.delay(new Array[Byte](8192)), true) { (is, buf) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is fun :trollface: should this chunk size here be configurable? Where should that be exposed?


object ProcessBuilder {

def apply(command: String, args: List[String]): ProcessBuilder =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity : why not def apply(command: String, args: String*) ? Is there some bin-compat related reason that make it bad ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, added 👍

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably use this api in scripts most often, and I'd like a more high-level api that abstracts some common patterns, e.g. passing optional arguments, or arguments k-v pairs, or options of those, etc.
Unfortunately the way you do that is with a bunch of implicit conversions 😬

Copy link
Member Author

@armanbilge armanbilge Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably use this api

I'm not sure if this is the API you want :) It's kind of like the JDK Process stuff vs Scalalib process DSL. The goal here is to provide the API and do the cross-platform stuff.

Another lib makes sense for your high-level API. If it's targeting scripts it can break bincompat more freely than FS2 can.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately the way you do that is with a bunch of implicit conversions

I think this would be a nice client-side addition to decline, or some higher-level library of the sort. To elaborate, Smithy4s provides an integration with decline OOTB : https://disneystreaming.github.io/smithy4s/docs/protocols/deriving-cli

With this PR being merged to fs2, I'd be inclined to add the client-side of things to go from high-level method calls to low level args that could be used in the ProcessBuilder, in a way that would match what is done in the decline integration.

Not trying to advertise, just saying that seems like a job for higher-level libraries. In particular because there is no actual "standard" for things like arguments key-value pairs, etc .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any additional lib is a pain for scripts

//> using toolkit "typelevel"

🙂

Copy link
Contributor

@Baccata Baccata Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any additional lib is a pain for scripts, i.e. you have to remember/google id and version before you can hit the ground running.

@SystemFw can I interest you in a modest tool I've written for that very reason ? IIRC you're an emacs user so here's the CLI version : https://github.com/neandertech/dexsearch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//> using toolkit "typelevel"

Is this a thing?!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be soon, hopefully :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new UnsealedProcess[F] {
def isAlive = F.delay(process.isAlive())
def exitValue = F.interruptible(process.waitFor())
def stdin = writeOutputStream(F.delay(process.getOutputStream()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A problem I've faced several times with the current implementation of fs2.io.writeOutputStream is the fact that chunks aren't flushed automatically: see https://github.com/neandertech/jsonrpclib/pull/58/files#r1114125452

I feel terrible for not having raised an issue yet 😞

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed :)

@ChristopherDavenport
Copy link
Member

Not content with it living in a library? https://github.com/davenverse/process

@Baccata
Copy link
Contributor

Baccata commented Feb 24, 2023

@ChristopherDavenport my personal take is that subprocesses are a fundamental enough building block of applications that having it upstream alongside files / networking makes sense.

@armanbilge
Copy link
Member Author

armanbilge commented Feb 25, 2023

Continuing from #3149 (comment).

So with that in mind, and if it was me, I'd change the ProcessBuilder to have the following overrides :

def spawn[F[_]: Async] : Resource[F, Process[F]]  = spawn(ProcessSpawn.forAsync[F])
def spawn[F[_]](processSpawn: ProcessSpawn[F]) : Resource[F, Process[F]] 
def spawnStream[F[_]: Async] : Stream[F, Process[F]]  = spawnStream(ProcessSpawn.forAsync[F])
def spawnStream[F[_]](processSpawn: ProcessSpawn[F]) : Stream[F, Process[F]]

This would reduce the ceremonies around starting subprocesses (which is important for scripting DX), whilst providing the flexibility to switch implementations (in particular for test purposes). As you mentioned, it's reasonably safe to assume that missiles will be launched, and therefore Async is a safe constraint to have on the spawn method.

Thanks for this. This is seems good—I'm almost convinced. However I worry that library authors will just use Async instead of asking for ProcessSpawn[F] like they are supposed to. I wonder if instead we should have:

def spawn : Resource[IO, Process[IO]]  = spawn(ProcessSpawn.forAsync[IO])
def spawn[F[_]](processSpawn: ProcessSpawn[F]) : Resource[F, Process[F]] 

By hard-coding to IO it will prevent tagless final library authors from cheating. IMHO we should be doing this for most things actually: provide builders in terms of Async, but only an implicit for IO. This will force code written in tagless final style to correctly declare all of its constraints, instead of cheatily deriving them from Async.


Also random bikeshed: rename ProcessSpawn[F] -> Processes[F]? Like Files[F] and Network[F]

Comment on lines 124 to 136
private[io] def blockingCancelable[F[_], A](
cancel: F[Unit]
)(thunk: => A)(implicit F: Async[F]): F[A] =
(F.deferred[Unit], F.delay(new AtomicBoolean(false))).flatMapN { (gate, inProgress) =>
F.race(
gate.get *> F.blocking {
inProgress.set(true)
try thunk
finally inProgress.set(false)
},
(gate.complete(()) *> F.never[A]).onCancel(F.delay(inProgress.get()).ifM(cancel, F.unit))
).map(_.merge)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fthomas
Copy link
Member

fthomas commented Feb 25, 2023

This looks interesting, thanks for working on it @armanbilge! Maybe I can use this in Scala Steward instead of our homegrown code for reading the output of processes.

One requirement we have is that processes never wait for input. We ensure this by closing stdin immediately after process creation. A possible test for this is running dd count=1. Without closing stdin, this would wait for input. With closing stdin, it would return immediately.

Another requirement we have is that processes must not inherit the environment variables of Scala Steward itself. This is done by calling clear() on java.lang.ProcessBuilder#environment

It would be nice if fs2.io.process.ProcessBuilder/Process would support these too.

@armanbilge
Copy link
Member Author

@fthomas thanks for the helpful feedback!

One requirement we have is that processes never wait for input.

In the current design you could write an empty Stream to stdin to close it, would that work for you? I need to verify the mechanics of this. That test is very helpful.

Another requirement we have is that processes must not inherit the environment variables of Scala Steward itself.

Ha, darn, was hoping I could sweep that under the rug 😂 ok thanks, I'll add support for that 😇

Comment on lines 37 to 38
def withInheritEnv(inherit: Boolean): ProcessBuilder
def withExtraEnv(env: Map[String, String]): ProcessBuilder
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bikeshed? :) Probably should add docs, they need more exposition now...

@fthomas
Copy link
Member

fthomas commented Feb 25, 2023

In the current design you could write an empty Stream to stdin to close it, would that work for you?

Yes, that works for me. Thank you for adding a test for that and the option to disable env inheritance!

Btw, Java's ProceesBuilder has the option to merge stdout and stderr via redirectErrorStream. I guess with streams, you can achieve the same with p.stdout merge p.stderr but it could be much more efficient if fs2.io.process.ProcessBuilder would have a similar option.

@armanbilge
Copy link
Member Author

armanbilge commented Feb 25, 2023

but it could be much more efficient

Yes, I chased it down in OpenJDK and it seems to be using dup under the hood. So, yeah 😅

The annoying thing is that Node.js doesn't support this. As a workaround we can just do the stream merge ourselves.

More generally, both the JDK and Node.js support various options for these streams.

For example, inheriting stdin/stdout/stderr from the parent process, or redirecting to a file (or even a socket). So maybe this setting should be modeled with an ADT. I'd also need to check how those options are supposed to interact with redirectErrorStream. Might be a good follow-up PR? 😇

Tangential, but FS2's Stream-based APIs often miss optimization opportunities like this. For example, sendfile is an awesome way to upload/download files over a socket. I believe this is exposed in the JDK as FileChannel#transferTo and FileChannel#transferFrom. I think it's even possible to use in Node.js.

@@ -26,7 +26,7 @@ package internal
import cats.effect.kernel.Concurrent
import cats.effect.kernel.Resource
import cats.effect.std.Queue
import cats.effect.std.Semaphore
import cats.effect.std.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This switch from Semaphore to Mutex seems unrelated to main PR. Perhaps separate PR?

@Baccata
Copy link
Contributor

Baccata commented Feb 27, 2023

Continuing from #3149 (comment)

By hard-coding to IO it will prevent tagless final library authors from cheating. IMHO we should be doing this for most things actually: provide builders in terms of Async, but only an implicit for IO. This will force code written in tagless final style to correctly declare all of its constraints, instead of cheatily deriving them from Async.

From my perspective, it does make sense. I think I'd call the monomorphic version spawnIO to reduce the appearance of bias towards a specific effect.

@fthomas
Copy link
Member

fthomas commented Feb 27, 2023

Might be a good follow-up PR?

Makes sense, especially since this is a bigger topic. Thanks for digging into it!

Comment on lines 37 to 70
implicit def forAsync[F[_]](implicit F: Async[F]): ProcessSpawn[F] = new UnsealedProcessSpawn[F] {
def spawn(process: ProcessBuilder): Resource[F, Process[F]] =
Resource
.make {
F.async_[facade.child_process.ChildProcess] { cb =>
val subprocess = facade.child_process.spawn(
process.command,
process.args.toJSArray,
new facade.child_process.SpawnOptions {
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString)
env =
if (process.inheritEnv)
(facade.process.env ++ process.extraEnv).toJSDictionary
else
process.extraEnv.toJSDictionary
}
)

subprocess.once("spawn", () => cb(Right(subprocess)))
subprocess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e))))
}
} { childProcess =>
F.asyncCheckAttempt[Unit] { cb =>
F.delay {
if ((childProcess.exitCode ne null) || (childProcess.signalCode ne null)) {
Either.unit
} else {
childProcess.kill()
childProcess.once("exit", () => cb(Either.unit))
Left(None)
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
implicit def forAsync[F[_]](implicit F: Async[F]): ProcessSpawn[F] = new UnsealedProcessSpawn[F] {
def spawn(process: ProcessBuilder): Resource[F, Process[F]] =
Resource
.make {
F.async_[facade.child_process.ChildProcess] { cb =>
val subprocess = facade.child_process.spawn(
process.command,
process.args.toJSArray,
new facade.child_process.SpawnOptions {
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString)
env =
if (process.inheritEnv)
(facade.process.env ++ process.extraEnv).toJSDictionary
else
process.extraEnv.toJSDictionary
}
)
subprocess.once("spawn", () => cb(Right(subprocess)))
subprocess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e))))
}
} { childProcess =>
F.asyncCheckAttempt[Unit] { cb =>
F.delay {
if ((childProcess.exitCode ne null) || (childProcess.signalCode ne null)) {
Either.unit
} else {
childProcess.kill()
childProcess.once("exit", () => cb(Either.unit))
Left(None)
}
}
}
}
implicit def forAsync[F[_]](implicit F: Async[F]): ProcessSpawn[F] = new UnsealedProcessSpawn[F] {
import facade.child_process.ChildProcess
private def spawnChild(process: ProcessBuilder): F[ChildProcess] =
F.async_[facade.child_process.ChildProcess] { cb =>
val subprocess = facade.child_process.spawn(
process.command,
process.args.toJSArray,
new facade.child_process.SpawnOptions {
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString)
env =
if (process.inheritEnv)
(facade.process.env ++ process.extraEnv).toJSDictionary
else
process.extraEnv.toJSDictionary
}
)
subprocess.once("spawn", () => cb(Right(subprocess)))
subprocess.once[js.Error]("error", e => cb(Left(js.JavaScriptException(e))))
}
private val cleanup: ChildProcess => F[Unit] =
(child: ChildProcess) => F.asyncCheckAttempt[Unit] { cb =>
F.delay {
if ((child.exitCode ne null) || (child.signalCode ne null)) {
Either.unit
} else {
child.kill()
child.once("exit", () => cb(Either.unit))
Left(None)
}
}
}
def spawn(process: ProcessBuilder): Resource[F, Process[F]] =
Resource.make(spawnChild(process))(cleanup)

Comment on lines 37 to 63
.make {
F.blocking {
val builder = new lang.ProcessBuilder((process.command :: process.args).asJava)

process.workingDirectory.foreach { path =>
builder.directory(path.toNioPath.toFile)
}

val env = builder.environment()
if (!process.inheritEnv) env.clear()
process.extraEnv.foreach { case (k, v) =>
env.put(k, v)
}

builder.start()
}
} { process =>
F.delay(process.isAlive())
.ifM(
F.blocking {
process.destroy()
process.waitFor()
()
},
F.unit
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you extract both initializer and finalizer as separate methods, and thus reduce this to Resource.make(initialise(process))(finaliser).map { process => ?

Comment on lines 65 to 90
new UnsealedProcess[F] {
def isAlive = F.delay(process.isAlive())

def exitValue = isAlive.ifM(
F.interruptible(process.waitFor()),
F.delay(process.exitValue())
)

def stdin = writeOutputStreamCancelable(
F.delay(process.getOutputStream()),
F.blocking(process.destroy())
)

def stdout = readInputStreamCancelable(
F.delay(process.getInputStream()),
F.blocking(process.destroy()),
8192
)

def stderr = readInputStreamCancelable(
F.delay(process.getErrorStream()),
F.blocking(process.destroy()),
8192
)

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract this as a subclass of UnsealedProcess, one which is private to this package? It is often easier to have named classes over anonymous ones.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants