-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide way to auto-close streams #213
Conversation
c576586
to
0894da3
Compare
This sounds like a noble goal. Is it also true for bidirectional streams? I'm looking forward to seeing where this PR goes. |
If you are referring to the point about it throwing an exception before you get back a stream, no. In fact, with the current interfaces (e.g. com.connectrpc.ServerOnlyStreamInterface, this is not an issue because (1) you get back the stream first, and then must send the one and only request and (2) the However, I'm exploring some alternate interface shapes in the conformance client (link). In these, the call to get a server stream accepts the singular request message and sends it before returning the stream (link). Also, the Anyhow, @kohenkatz, I'm interested if you have any thoughts or feedback to provide. You can see these new interfaces in use in the conformance client code (sadly, the actual logic in there is pretty boring, but hopefully you can get an idea for the feel of these APIs). |
As you noted at the top:
... so it is a little difficult to generalize that code back to what we might see in a real app. That said, I like what I'm seeing here so far. Some of our developers are very bad at error handling and cleaning up - they tend to assume that everything will always be successful - so any change that looks like it would simplify that is welcome. |
One thing I am considering is, instead of an extension function that takes a block, to instead have the primary RPC methods (in the generated client stubs) take a block. That way, you must do all of the operations on the stream in a block, so there's no risk that the stream is leaked, even in the face of exceptions. But I'm worried there might be some uses of long-lived streams that the block might make awkward and clunky. @kohenkatz, @pkwarren, any thoughts or concerns with an approach like that? |
We use long-lived streams in two ways that I can think of offhand. In both of these cases, we also have logic to reconnect the stream, since we find that our users have very poor Internet connection quality and/or are moving between different network connections (Wi-Fi and cellular).
In truth, in the second case, there is probably a way to use a block eloquently as well, but it might require some more work. The issue with the description I gave above is that the application anyway needs a way to buffer messages that are waiting to be sent while the connection is being (re)established. This probably means that the best way to handle sending messages is to push them into some type of queue/flow/channel and then read from that queue inside the block. Perhaps for long-lived bidirectional streams, it might make sense to have a version that requires a block for handling received messages and listens to some kind of queue for sending? |
@pkwarren pointed out to me that the Ktor library also has a block-based API, even for full-duplex bidirectional calls like web sockets: https://ktor.io/docs/getting-started-ktor-client-chat.html#wire-it-together One thing interesting is that the receiver for the block in that library is For the case you describe, I think the channel approach is the way to go. I think it would look something like so (psuedo-code-ish, so there may be typos and it probably wouldn't compile): val requestChannel = Channel<Request>()
// TODO: Capture result of this as Deferred<T> so we
// can do something with its result and also
// take action if it fails. (Though if it fails
// while requestChannel is still in use, the
// channel will throw an exception.)
async {
client.someBidiMethod(headers) { stream ->
// separate coroutine to send requests from the channel to the server
val sendJob = launch {
try {
for (req in requestChannel) {
stream.requests.send(req)
}
} catch (ex: Throwable) {
// we can't accept any more requests
requestChannel.close(ex)
} finally {
stream.requests.close()
}
}
var failure: Throwable? = null
try {
for (resp in stream.responses.messages) {
// process each response
}
} catch (ex: Throwable) {
failure = ex
throw ex
} finally {
// no-op if channel already closed or job already finished
requestChannel.close(failure)
sendJob.cancel()
}
}
}
// caller can then write requests to the channel, and they get sent on the stream
return requests WDYT? That doesn't seem too bad. |
…ected, make block use CoroutineScope as this for easy access to launch and async
Based on that last comment, I pushed another commit. The block-based methods are now the way to use the streams. (The non-block-based methods are now protected and implementation details of the others.) Based on what Ktor was doing, I've also made the block receiver Again, this is just for the conformance client for now. But I think this is starting to look pretty good for maybe applying to the interfaces in |
@jhump I like this approach with a channel and a block. It forces the stream-handling to be done in one place, instead of being spread around in many places. |
package com.connectrpc.conformance.client.adapt | ||
|
||
// Like java.io.Closeable, but the close operation is suspendable. | ||
interface Closeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all looks great - might just want to name this SuspendCloseable
or AsyncCloseable
or something like that so people aren't confused with multiple Closeable
types.
This is mostly just me riffing on stuff that would be more idiomatic Kotlin.
In particular, this makes the three stream types
Closeable
. They aren'tjava.io.Closeable
because I want the close to be suspending function. (Not currently necessary, but I think it might be in the future to implement some other ideas I have.)Anyhow, this also introduces extension functions that let you execute an RPC and use the stream in a block, so that the stream is automatically closed for you. I think that works especially well for the
ServerStream
, which could actually throw before you get back a stream (since it could fail to send the initial request). So, without something like this, it could be gnarly with a separate try/catch on the main invocation and then another try/finally around the stream operations. (Admittedly, most client code probably won't look quite like the conformance client, which is trying to capture a lot of info to package back into response to send to test runner...)This is still all just in the conformance client. Once we're happy with the shape of the APIs there, I'd like to re-do the actual interfaces in the main connectrpc package. So I consider the internal APIs of the conformance "adapter" package my playground :)
Anyhow, just figured I'd explore a little and see what you think.