-
Notifications
You must be signed in to change notification settings - Fork 4.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming API buffers messages leaving potential for data loss #2159
Comments
Hey @cstockton , |
@MakMukhi Thanks, though I am concerned that there is invisible buffering on something that has a primitive IO interface, i.e.: zero controls or documentation around how it buffers messages internally. How and when does it create back pressure? Since buffering indefinitely must not happen here as it would be a security issue (immediate potential for a denial of service vector) it has to block at some point. Performance is not being increased via the current API, the cost is being hidden in exchange for safety and reliability by removing. An exchange that I believe goes against a core tenet of high quality library design in Go: block callers by default. I understand that not everyone may share this opinion- but I believe their is only one path forward: API must be blocking. Maybe an example underneath with a few lines of code to get the identical (user facing) behavior that GRPC has today would be a better alternative: ch := make(chan Thing, HowBigIsThis_NobodyKnows)
go func() {
stream, err := grpc.New...
for {
select { case <-ctx.Done(): ...; case thing := <-ch: stream.Send(thing); }
}
}() |
The buffering is not indefinite, every time a message is sent it takes away from a local soft limit of 64KB which is then replenished as bytes are written on the wire. This also provides user back pressure and prevents them from accidentally OOMing their local machine. All of this is internal implementation and therefore not documented in the API. Also note that send and recv are not primitive calls they do not act on the underlying connection. Blocking on a write call until it has been been written on the wire has problems like the following:
|
So depending on the size of the message thousands of messages may be lost in a short burst? I changed my request counts to find the maximum message loss for my use case is 2800, so I can infer around 2800 messages fit in this 64k buffer.
I am glad that there are defined bounds- but to be clear you are not naming this as a feature of the current design right? Because the same would be true if there was no buffering at all.
Thanks for helping me understand the original rationale for not documenting the fact the library could drop 64kb of user messages. I think at this point we can agree this is not an internal implementation detail- but user facing since it has been noticed by one.
Blocking on a write does not mean blocking until success, it means blocking until the most immediate definite outcome. In your example as soon as the connection is lost you have an outcome (broken pipe, io.EOF, whatever)- so return it. Leave the decision to retry 20-30 minutes in a tight loop to the very capable programmer utilizing the library.
It has deadlines, which are set by I don't feel sufficient technical merit has emerged here to justify the current design, nor will it given how well traveled this area of design is. I believe it should be the number one priority of the rpc to provide strong guarantees around message delivery, but I understand if the grpc authors will not concede here. I'll keep an eye out on this issue in the future, for now I'm switching to the equivalent non-streaming API. Thanks! |
That's the problem; the underlying net.Conn write call won't return for 20-30 mins until it realizes the connection is not reachable any more.
The only way to set deadlines on net.Conn is through an API call which would mean that we set and reset the deadline for every single write call we make. Also, the whole premise of this issue is about client canceling the context without reading status back from the server. Note this is not an expected usage of gRPC. I agree that the documentation needs to be updated but disagree with updating the design due to incorrect usage resulting from poor documentation. |
It will only block for for 20-30 minutes because there is no deadlines, the entire point of my post was to add them.
The streaming API made the unconventional design decision to bind ctx to the
What you are telling me is that this is unexpected usage: http.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) {
stream, err := client.GetStream(r.Context())
if err != nil {
http.Error(w, ...)
return
}
defer stream.CloseAndRecv()
for _, req := range reqs {
if err = stream.Send(req); err != nil {
http.Error(w, ...)
return
}
}
}) I disagree, I believe your API has an unexpected design that is a risk to users data. It introduces a type of subtlety that is extremely difficult to track down. Send doesn't send anything. It queues an object to maybe be sent, maybe not. The SendMsg func obtains some buffer allowance in Write(concerning, please see [1]) before calling controlBuf.put which does little more than wrap a mutex around an unbound enqueue to a linked list and potentially wake a waiting receiver, essentially reproducing channel semantics. I can't think of a reason why you would do this other than specifically wanting an unbound queue. Essentially it is UNSAFE (if you care about messages getting delivered) to use the streaming API with a context that can be canceled, which.. is most any context in use by any go program I've ever seen. That is why context is used. But if you feel it is more reasonable is for me to give GRPC stream a background Context() and monitor my request context until it's done, at which point I stop sends and call "CloseAndRecv()" than I have no option but to do so. I've never had to solve this unusual problem before, but I guess you expect users to write something like below or do you have another suggestion? http.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.GetStream(ctx)
if err != nil {
http.Error(w, ...)
return
}
defer stream.CloseAndRecv()
for _, req := range reqs {
if err = r.Context().Err(); err != nil {
http.Error(w, ...)
return
}
if err = stream.Send(req); err != nil {
http.Error(w, ...)
return
}
}
}) Though that does no good for any other type of failure, panics, hardware failure, and so on. The bottom line here is that the API provides no way to ensure a prior Send has completed without closing the stream all together. This contradicts the very reason why I want to stream items, to make incremental progress towards my goal. I don't think it's unreasonable to want to be able to measure or create checkpoints at any interval of my choice as if I was writing to a file. It's like an Anyways, if you are concerned about backwards compatibility I would be happy to brainstorm ways to fix this API while keeping performance gains you claim would be lost. But if you really believe this design is sound, feel free to close the issue I've made as much of a case as I can the rest is up to the GRPC authors. |
So are you suggesting that the transport keeps a heap of deadlines and every time a new send message call comes in we add it to the heap and update the underlying net.Conn's deadline? I'd really appreciate it if you think over design ideas thoroughly before standing by them vehemently. By the not buffering any thing means a syscall for every frame that we write!
No, I don't see you canceling any contexts on gRPC stream before CloseAndRecv finishes.
If you're curious, feel free to look up the PRs that made these changes and the associated benchmark numbers. A channel and serveral other data structures were tried and the current implementation proved to be most optimal.
Wow, that's definitely an exaggeration! Do you expect a "sent" message to be received by the other side instantly? If gRPC doesn't buffer then, the kernel will and so will all the network buffers along the way. Just don't cancel the context until you have got a status from the server!
Funny you should mention that because the underlying net.Conn is very similar to that minus the context of course. You can read about it more here. The point here is that a network is not just an os.File, there are other considerations and performance is a big one. We can't just make syscalls for every message that goes out. Among this we haven't even discussed flow control by the way. Each data frame that goes out needs to be checked against stream level and connection level flow control. The latter will require a global lock to be acquired by every stream for every data frame to go out. That's a very high contention lock! Moreover, if you want to wait for every send to actually go on the wire, you have a bunch of stream goroutines that are waiting for each other to write(connection level flow control requires cooperation). Also, don't forget round trip times for window acks that a send might have to wait for. These user goroutines can be instead doing meaningful work! I really don't want to justify our design choices further. If you're so inclined, I encourage you to perhaps come up with a prototype and run it against our benchmarks. If it makes things better there's no reason gRPC won't accept it. |
This was a very difficult to issue to track down because there are so many moving parts when the system is shutting down which is the only time this issue surfaced. I think some of my frustration began to leak into the issue once I saw it being dismissed, I'm sorry for that. It's also disappointing I've somehow failed to show you the difficult ergonomics users face with the current API, despite multiple snippets of code that I felt were clear examples. The latest of which you replied:
You have demonstrated the subtlety of the API first hand by giving me a thumbs up on this. Maybe you can come back to it when you have a clear head and look at the below snippet again. Once you notice it I challenge you face the difficult ergonomics of the library first hand by writing a version that ensures all messages are sent before the function exits:
Finally I forgot to include the footnote for my [1] on Write. I wanted to make sure that there is no way in gRPC to send a data frame with no data or header? It appears that in such a scenario the get() that acts as the write barrier to create back pressure when writes are slow could be skipped, causing unbound writes to the linked list. |
I can relate to the frustration resulting from complex, sometimes unfortunate, engineering problems. I've been there several times myself. I do think that blocking a Send until the message is written on the wire is a reasonable feature to expect. Moreover, this can be accomplished fairly easily in the current implementation. I have strong apprehensions about making it the default behavior however. Following is how I envision this behavior can be added: The API surface can also be augmented to support blocking with something like. type Blocker interface {
SendMsgBlocking()
}
The generated code, can then, have a method like: func (x *routeGuideRecordRouteClient) SendBlocking(m *Point) error {
x.ClientStream.(Blocker).SendMsgBlocking()
} Here's a running example of such a pattern. If you think gRPC should have this API, I encourage you to open another issue specifically for that.
The only way to send an empty data frame is by |
Relevant issue: grpc#2159
documentation: clarify SendMsg documentation Relevant issue: #2159
Deprecate Stream, and move the methods and documention to ServerStream and ClientStream. This is due to the fact that there are different semantics for SendMsg, and it's quite confusing to document one method for two things. Furthermore, Stream is not actually used in any way other than to be inherited by ClientStream and ServerStream. Relevant issue: grpc#2159
Deprecate Stream, and move the methods and documention to ServerStream and ClientStream. This is due to the fact that there are different semantics for SendMsg, and it's quite confusing to document one method for two things. Furthermore, Stream is not actually used in any way other than to be inherited by ClientStream and ServerStream. Relevant issue: grpc#2159
docs: deprecate stream, move documentation to client|server stream Deprecate Stream, and move the methods and documention to ServerStream and ClientStream. This is due to the fact that there are different semantics for SendMsg, and it's quite confusing to document one method for two things. Furthermore, Stream is not actually used in any way other than to be inherited by ClientStream and ServerStream. Relevant issue: #2159
@cstockton Have you managed to open another issue for this newer API specifically? We here at @codeship are in a similar predicament. I'd be willing to help draft a proposal and work on an implementation for it. |
@brettbuddin I came to the conclusion I was failing to convey the difficulties of using this API and took no further action. I would be more than happy to give feedback on any API proposals, but don't have any desire to drive the effort. Thanks for reaching out to me hope you find a work around. |
Even if you are notified as a message is written to the wire, as suggested in @MakMukhi's comment above, it is still not an indication that the server will ever receive it. It could be lost if a network error occurs after transmission. It could be lost if the server crashes. Or, it could sit in a buffer on the server, waiting to be The only way to ensure the server received your messages is to either build it into your protocol (via streaming ACK responses) or wait for successful RPC completion. This is a fundamental part of the way gRPC works, and all gRPC implementations have this behavior. If you want your stream to outlive another context, don't derive your stream's context from the other context. It is admittedly a bit unusual in Go for a context to outlive a function call, but cancellation propagation is a core gRPC feature, this was the most natural way to express it, and this mechanism works similarly in the other languages as well. |
@dfawley Developers are looking for the same reasonable guarantees they find in most any language or library they use, including the Go standard library: A call to For GRPC the guarantee is that your message will be placed into a buffer in memory. So to understand delivery guarantees one must dig through thousands of lines of code as I did, only to come to the conclusion under normal usage there are none. Since as you stated above the conditions which delivery can have reasonable guarantees require writing unusual Go code- that is you must be aware that messages may not be delivered to write such code. Though I disagree that you can even write software to work around this, since at some point you need to have a mechanism for cancellation. Even with a separate background context- how do you eventually give up and measure the progress you have made thus far, given you have no way to check how many of the existing buffered messages resulted in a system call? To summarize I disagree with the counter argument that delivery guarantees at the media or destination host layer is impossible with additional protocol support. While you are not wrong I don't see how it's related to GRPC lacking the same reasonable guarantees (1 send -> 1 host system call) provided by most any other networking library developers have ever used. I also disagree with performance benefits given that I consider performance gains that sacrifice correctness or introduce bullet-item lists of nuance (i.e. grpc stream docs) to have no merit. Though stream send requests could carry synchronization back to the blocked callers once a corresponding system call has been made with little affect on throughput capability of concurrent senders, affecting only the synthetic benchmarks of single-senders. |
The gRPC-Go streaming API is not intended to be conceptually at the same level as file descriptor operations. Applications that write to FDs need to know when syscalls are completed for various reasons. gRPC streams are a much higher level API. Applications that write to grpc streams have no reason to worry about syscalls; they should only be concerned with message delivery guarantees.
The comments on
You would need server-streaming ACKs if you are concerned with partial RPC progress (regardless of our API).
My point is that it doesn't make any difference to the application whether the syscall happens before or after |
Please answer these questions before submitting your issue.
What version of gRPC are you using?
1.12.2
What version of Go are you using (
go version
)?go10.0.3
What operating system (Linux, Windows, …) and version?
Linux
What did you do?
Given the following
grpc.ClientStream
code (with GetStream in place of boiler plate for GobgpApi_InjectMrtClient) and reqs is a slice of 255 valid messages:The code below will lose messages:
While the code below will ensure all 255 messages are delivered.
The documentation for SendMsg led me to believe that once Send returns my message has been sent. I accept that "sends m" may mean it sends to some kind of internal buffer but I believe this is important and should be documented with emphasis.
So I am a bit confused again by the semantics around GRPC streaming API even after clarification of #2071. I checked grpc options and other config knobs and couldn't find anything to provide guarantees for Send().
The obvious fix is don't do the first thing! But unfortunately
Send()
happens in a worker Goroutine that selects on a job channel which results in a call to Send, or exits when the context is done. Since Send may have buffered messages but gives me no way to flush them (that I can find) without closing the stream I can't guarantee ajob.Reply()
actually resulted in aSend()
since the work context could end at any moment resulting in all pending messages being discarded.The only option I can think of is splitting the context. Meaning I'll have to give the grpc stream a separate
context.Background()
wrapped withcontext.WithCancel
and start another goroutine which will wait for the real request context to be done, at which point I callsr.CloseAndRecv
and finally the wrapped grpc stream ctxcancel()
which ends the 3rd cancellation goroutine that was started upon stream creation by grpc.I think the most reasonable fix is for Send to block until a message is actually sent or provide a mechanism to flush even if it an optional interface I have to assert on i.e. (sr.(grpc.Flusher).Flush()). Of course if I am missing something I would be open to other options that do not involve more goroutines / moving parts. Thanks for the help.
The text was updated successfully, but these errors were encountered: