Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully close websocket connections from the client #205

Closed

Conversation

evan-bradley
Copy link
Contributor

On calling OpAMPClient.Stop for websocket clients, the client closes the TCP connection without performing a closing handshake, which from my testing throws a handful of errors in the client and the server.

This change attempts to gracefully close the connection by initiating a closing handshake as specified in RFC 6455. This gives the connection on both sides time to clean up before terminating the TCP connection, and is more compliant with the Websocket specification.

Testing with the Supervisor, this cleans up the following errors in the client:

2023-09-20T16:16:42.018-0400    ERROR   internal/wssender.go:77 Cannot write WS message: use of closed network connection
github.com/open-telemetry/opamp-go/client/internal.(*WSSender).sendMessage
        /home/ihsmwaqhcpfwcime/code/github.com/evan-bradley/opamp-go/client/internal/wssender.go:77
github.com/open-telemetry/opamp-go/client/internal.(*WSSender).sendNextMessage
        /home/ihsmwaqhcpfwcime/code/github.com/evan-bradley/opamp-go/client/internal/wssender.go:70
github.com/open-telemetry/opamp-go/client/internal.(*WSSender).run
        /home/ihsmwaqhcpfwcime/code/github.com/evan-bradley/opamp-go/client/internal/wssender.go:56
2023-09-20T16:16:42.018-0400    ERROR   internal/wsreceiver.go:53       Unexpected error while receiving: read tcp 127.0.0.1:45242->127.0.0.1:4320: use of closed network connection
github.com/open-telemetry/opamp-go/client/internal.(*wsReceiver).ReceiverLoop
        /home/ihsmwaqhcpfwcime/code/github.com/evan-bradley/opamp-go/client/internal/wsreceiver.go:53
github.com/open-telemetry/opamp-go/client.(*wsClient).runOneCycle
        /home/ihsmwaqhcpfwcime/code/github.com/evan-bradley/opamp-go/client/wsclient.go:243
github.com/open-telemetry/opamp-go/client.(*wsClient).runUntilStopped
        /home/ihsmwaqhcpfwcime/code/github.com/evan-bradley/opamp-go/client/wsclient.go:265
github.com/open-telemetry/opamp-go/client/internal.(*ClientCommon).StartConnectAndRun.func1
        /home/ihsmwaqhcpfwcime/code/github.com/evan-bradley/opamp-go/client/internal/clientcommon.go:199

This also cleans up the following error in the server that I have seen while running tests:

Cannot send message to WebSocket: write tcp 127.0.0.1:42589->127.0.0.1:48774: write: connection reset by peer

@codecov
Copy link

codecov bot commented Sep 20, 2023

Codecov Report

Attention: 21 lines in your changes are missing coverage. Please review.

Files Coverage Δ
client/httpclient.go 95.16% <100.00%> (+0.42%) ⬆️
client/internal/clientcommon.go 79.80% <100.00%> (ø)
client/internal/receivedprocessor.go 84.93% <100.00%> (+0.64%) ⬆️
client/types/callbacks.go 66.66% <ø> (ø)
server/callbacks.go 68.18% <100.00%> (-4.55%) ⬇️
server/wsconnection.go 100.00% <100.00%> (ø)
client/internal/packagessyncer.go 59.25% <0.00%> (ø)
server/serverimpl.go 58.92% <95.00%> (+0.37%) ⬆️
server/httpconnection.go 33.33% <0.00%> (ø)
client/internal/mockserver.go 82.35% <85.71%> (+0.31%) ⬆️
... and 2 more

... and 5 files with indirect coverage changes

📢 Thoughts on this report? Let us know!.

@evan-bradley evan-bradley changed the title Gracefully close the client connection Gracefully close websocket connections Sep 20, 2023
@evan-bradley evan-bradley changed the title Gracefully close websocket connections Gracefully close websocket connections from the client Sep 20, 2023
@srikanthccv
Copy link
Member

Thanks, and this graceful close is good, but I think this also has the same issue. Isn't there a good chance that ReceiverLoop tries to read the message after the close message is sent? And the ReadMessage will return an error something like websocket: close sent?

@evan-bradley
Copy link
Contributor Author

Isn't there a good chance that ReceiverLoop tries to read the message after the close message is sent?

In an ordinary circumstance it is expected that the ReceiverLoop will read one final message after the client sends its close message. The Websocket close handshake includes a close response from the server after the client sends its close message, so we will hit this condition and the receiver loop will close.

And the ReadMessage will return an error something like websocket: close sent?

The websocket: close sent error is only thrown for attempting to send messages. The code now waits for any pending messages to be sent to avoid this error. Any messages that the client implementation attempts to send after calling Stop will throw this error, but we could probably avoid that by throwing an error to the client implementation directly.

@srikanthccv
Copy link
Member

The websocket: close sent error is only thrown for attempting to send messages.

Does it only happen for send? I also saw this happen for read messages in a simple setup right after the Close message is sent.

@evan-bradley
Copy link
Contributor Author

To the best of my understanding that error will only be thrown when writing to the WebSocket connection. I did a search for the error and the only places it came up were when writing to the WebSocket: https://github.com/search?q=repo%3Agorilla%2Fwebsocket%20ErrCloseSent&type=code.

From a specification point of view, I don't think it would make sense to throw that error on reading from the WebSocket connection. For the closing handshake the client needs a close message from the peer, so it will need to read messages from the WebSocket until it reads the peer's close message.

@srikanthccv
Copy link
Member

I couldn't recreate the error case I mentioned earlier. You are correct; this is clearly a better way to close.

@evan-bradley evan-bradley marked this pull request as ready for review September 26, 2023 20:57
@evan-bradley evan-bradley requested a review from a team September 26, 2023 20:57
@evan-bradley
Copy link
Contributor Author

I couldn't recreate the error case I mentioned earlier.

Thanks for confirming. I think this should be ready for review now.

client/wsclient.go Outdated Show resolved Hide resolved
client/wsclient.go Outdated Show resolved Hide resolved
client/wsclient.go Outdated Show resolved Hide resolved
Comment on lines 42 to 45
isStopped atomic.Bool

stopProcessors chan struct{}
processorsStopped chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document these fields.

message := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))

select {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to honour the ctx', so probably add a case <-ctx.Done():.

})

message := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does a deadline of one second do? Is this the maximum time allowed for the message to reach our peer and to be acked? Seems too low in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the time the WebSocket client will wait for other writers that have a lock on the underlying connection to release their lock before aborting sending the control message. From what I can tell, writers will release their lock on the socket once it is written to without regard for whether the other end receives it. I'll raise it for now in case a client sends an enormous payload before shutting down, but for the average case I think roughly a second should be ample. Do you have any suggestions for what a good value might be here?

Comment on lines 319 to 322
c.sender.WaitToStop()

// Close the connection to unblock the WSSender as well.
// Close the connection.
_ = c.conn.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the swapping of the order of "close" vs "wait". The comment "Close the connection to unblock the WSSender as well" seems to indicate closing is necessary to ensure wait complete. Is that the case or the comment was misleading?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have misunderstood it, but to me the comment was misleading. Closing the connection doesn't appear to have any effect on the WSSender since the only way to shut down WSSender is by canceling the context, which is done by calling procCancel in this function.

I mostly reversed these for two reasons:

  1. This allows us to clean up the application-level side of the connection before shutting down the transport-level component, which feels like the proper order of operations to me.
  2. In the case where we had an error on receiving (e.g. we got bad proto data) and we're looking to reset the connection, this will allow us to send any remaining messages before closing it.

conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))

select {
case <-time.After(3 * time.Second):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why specifically 3 seconds? Is this expected to do a network roundtrip? It seems too low in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to revisit this, but I mostly kept this at a low value since it's just for acking that the connection is shut down and I couldn't find a standard timeout to use. I agree it could be too low, do you think a higher value like 10 seconds would make sense?

_ = conn.Close()
// Shut down the sender and any other background processors.
c.stopProcessors <- struct{}{}
<-c.processorsStopped
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this get stuck or take a very long time? Should we check for <-ctx.Done() too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine checking for ctx.Done just to be safe, but this realistically shouldn't get stuck for a long time since it is essentially triggered by procCancel in runOneCycle.

client/wsclient.go Show resolved Hide resolved
client/wsclient.go Outdated Show resolved Hide resolved
client/wsclient.go Outdated Show resolved Hide resolved
// Close connection if any.
c.connMutex.RLock()
conn := c.conn
c.connMutex.RUnlock()

if conn != nil {
_ = conn.Close()
// Shut down the sender and any other background processors.
c.stopBGProcessors <- struct{}{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be safer to close the channel to signal stopping? Reading this code locally it is not apparent that there is only one reader of this signal. In fact it says "processors", plural, implying that there may be many readers, in which case only one will be notified by writing to the channel. As opposed to that closing the channel will notify them all and seems safer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this channel is to communicate between Stop and runOneCycle, since runOneCycle "owns" the background processors. I've renamed the channels and updated the comments to hopefully clarify this. Sending a message to the channel should be sufficient, but I've updated this line to close the channel to indicate that this is the last message on the channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow-up question, do we expect there will ever be more background processors? Right now it's just the sender, which seems like it should be sufficient for the foreseeable future. If we don't think there will be more, I will make the naming a little more specific.

Comment on lines 164 to 166
if c.isStopped.Load() {
return errStopping
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this check? What happens if Stop() is called immediately after this check, is there a danger of race?

Copy link
Contributor Author

@evan-bradley evan-bradley Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this check?

This check disallows client implementations from attempting to send messages after they have called Stop.

What happens if Stop() is called immediately after this check, is there a danger of race?

There is, that's a great observation. We need to inhibit stopping for the duration of these methods to prevent the messages from being lost without an error being returned. I've updated the concurrency control mechanisms to account for this.

select {
case <-c.bgProcessorsStopped:
case <-ctx.Done():
_ = c.conn.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading c.conn requires locking a mutex because it may be written elsewhere. That is the reason we have conn as a local variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, not sure where I missed that. I've updated all occurrences of this to lock.

err := conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(c.connShutdownTimeout))

if err != nil {
_ = c.conn.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. I don't think we should read c.conn, it can race with writes elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again good catch, it should be fixed now.

return errEarlyStop
}

defaultCloseHandler := conn.CloseHandler()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to call CloseHandler() concurrently with other uses of the connection? We may be using the connection (e.g. writing or reading data) in background processes at the same time? I can't tell if it is safe from the docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloseHandler only returns the current close handler, it doesn't perform any reads from or writes to the connection. Our goal here is to wrap the close handler so we can signal when the WebSocket library is done closing.

// The server should respond with a close message of its own, which will
// trigger this callback. At this point the close sequence has been
// completed and the TCP connection can be gracefully closed.
conn.SetCloseHandler(func(code int, text string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concurrency question for SetCloseHandler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetCloseHandler only sets the close handler function, which will be called later when the client receives a close message from the server. We know that all writes have stopped once bgProcessingStopped has fired, which we wait on above in this method.

})

message := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
err := conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(c.connShutdownTimeout))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a guarantee that conn is not written concurrently by the background processors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should. bgProcessingStopped will emit a message when all background processors have stopped, and we wait on that above before starting the closing handshake.

@evan-bradley
Copy link
Contributor Author

@srikanthccv @tigrannajaryan I'm growing increasingly concerned that the current implementation may be unnecessarily complex and hard to follow. I've done my best to leave comments explaining it, but it's possible I should revisit this as a larger refactor of how we manage the transport layer connection, readers, and writers. I still think what's here is an improvement over the way the connection is currently closed, but if there are concerns over verifying it's free of races, deadlocks, etc. we can look at other approaches.

Let me know your thoughts, maybe this is something we discuss at the next working group meeting.

@tigrannajaryan
Copy link
Member

I'm growing increasingly concerned that the current implementation may be unnecessarily complex and hard to follow.

@evan-bradley this is my feeling as well. We could try to finalize this PR to handle this particular issue but I am also concerned that the codebase is not easy to understand and prove that it is correct, especially from concurrency perspective.

I would not mind a refactoring to make the code more understandable.

@evan-bradley
Copy link
Contributor Author

I'm closing this for now in favor of addressing this more holistically by refactoring the client. I believe an approach of controlling the sender, receiver, and transport from separate goroutines that communicate with channels will likely be easiest to reason about and map well to how Go handles concurrency in the standard library. I'm not sure whether this approach is viable or whether I will be able to attempt it soon.

haoqixu added a commit to haoqixu/opamp-go that referenced this pull request Nov 21, 2023
haoqixu added a commit to haoqixu/opamp-go that referenced this pull request Feb 6, 2024
haoqixu added a commit to haoqixu/opamp-go that referenced this pull request Mar 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants