-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gracefully close websocket connections from the client #205
Changes from 6 commits
ca9f1d7
ac95ea7
c3d8a87
19338c6
1986e07
dcb4150
8d83d23
b8b75cd
63f591c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ import ( | |
"net/http" | ||
"net/url" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/cenkalti/backoff/v4" | ||
|
@@ -17,6 +18,8 @@ import ( | |
"github.com/open-telemetry/opamp-go/protobufs" | ||
) | ||
|
||
var errStopping = errors.New("client is stopping or stopped, no more messages can be sent") | ||
|
||
// wsClient is an OpAMP Client implementation for WebSocket transport. | ||
// See specification: https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#websocket-transport | ||
type wsClient struct { | ||
|
@@ -35,8 +38,15 @@ type wsClient struct { | |
|
||
// The sender is responsible for sending portion of the OpAMP protocol. | ||
sender *internal.WSSender | ||
|
||
isStopped atomic.Bool | ||
|
||
stopProcessors chan struct{} | ||
processorsStopped chan struct{} | ||
} | ||
|
||
var _ OpAMPClient = &wsClient{} | ||
|
||
// NewWebSocket creates a new OpAMP Client that uses WebSocket transport. | ||
func NewWebSocket(logger types.Logger) *wsClient { | ||
if logger == nil { | ||
|
@@ -45,8 +55,10 @@ func NewWebSocket(logger types.Logger) *wsClient { | |
|
||
sender := internal.NewSender(logger) | ||
w := &wsClient{ | ||
common: internal.NewClientCommon(logger, sender), | ||
sender: sender, | ||
common: internal.NewClientCommon(logger, sender), | ||
sender: sender, | ||
stopProcessors: make(chan struct{}), | ||
processorsStopped: make(chan struct{}), | ||
} | ||
return w | ||
} | ||
|
@@ -80,13 +92,40 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro | |
} | ||
|
||
func (c *wsClient) Stop(ctx context.Context) error { | ||
c.isStopped.Store(true) | ||
srikanthccv marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Close connection if any. | ||
c.connMutex.RLock() | ||
conn := c.conn | ||
c.connMutex.RUnlock() | ||
|
||
if conn != nil { | ||
_ = conn.Close() | ||
// Shut down the sender and any other background processors. | ||
c.stopProcessors <- struct{}{} | ||
<-c.processorsStopped | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this get stuck or take a very long time? Should we check for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine checking for |
||
|
||
defaultCloseHandler := conn.CloseHandler() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it safe to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
closed := make(chan struct{}) | ||
|
||
// The server should respond with a close message of its own, which will | ||
// trigger this callback. At this point the close sequence has been | ||
// completed and the TCP connection can be gracefully closed. | ||
conn.SetCloseHandler(func(code int, text string) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same concurrency question for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
err := defaultCloseHandler(code, text) | ||
closed <- struct{}{} | ||
return err | ||
}) | ||
|
||
message := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") | ||
conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does a deadline of one second do? Is this the maximum time allowed for the message to reach our peer and to be acked? Seems too low in that case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the time the WebSocket client will wait for other writers that have a lock on the underlying connection to release their lock before aborting sending the control message. From what I can tell, writers will release their lock on the socket once it is written to without regard for whether the other end receives it. I'll raise it for now in case a client sends an enormous payload before shutting down, but for the average case I think roughly a second should be ample. Do you have any suggestions for what a good value might be here? |
||
|
||
select { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we also need to honour the |
||
case <-time.After(3 * time.Second): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why specifically 3 seconds? Is this expected to do a network roundtrip? It seems too low in that case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant to revisit this, but I mostly kept this at a low value since it's just for acking that the connection is shut down and I couldn't find a standard timeout to use. I agree it could be too low, do you think a higher value like 10 seconds would make sense? |
||
_ = c.conn.Close() | ||
case <-closed: | ||
// runOneCycle will close the connection if the closing handshake completed, | ||
// so there's no need to close it here. | ||
} | ||
} | ||
|
||
return c.common.Stop(ctx) | ||
|
@@ -97,22 +136,37 @@ func (c *wsClient) AgentDescription() *protobufs.AgentDescription { | |
} | ||
|
||
func (c *wsClient) SetAgentDescription(descr *protobufs.AgentDescription) error { | ||
if c.isStopped.Load() { | ||
return errStopping | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of this check? What happens if Stop() is called immediately after this check, is there a danger of race? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This check disallows client implementations from attempting to send messages after they have called
There is, that's a great observation. We need to inhibit stopping for the duration of these methods to prevent the messages from being lost without an error being returned. I've updated the concurrency control mechanisms to account for this. |
||
return c.common.SetAgentDescription(descr) | ||
} | ||
|
||
func (c *wsClient) SetHealth(health *protobufs.AgentHealth) error { | ||
if c.isStopped.Load() { | ||
return errStopping | ||
} | ||
return c.common.SetHealth(health) | ||
} | ||
|
||
func (c *wsClient) UpdateEffectiveConfig(ctx context.Context) error { | ||
if c.isStopped.Load() { | ||
return errStopping | ||
} | ||
return c.common.UpdateEffectiveConfig(ctx) | ||
} | ||
|
||
func (c *wsClient) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error { | ||
if c.isStopped.Load() { | ||
return errStopping | ||
srikanthccv marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return c.common.SetRemoteConfigStatus(status) | ||
} | ||
|
||
func (c *wsClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) error { | ||
if c.isStopped.Load() { | ||
return errStopping | ||
} | ||
return c.common.SetPackageStatuses(statuses) | ||
} | ||
|
||
|
@@ -193,9 +247,10 @@ func (c *wsClient) ensureConnected(ctx context.Context) error { | |
} | ||
|
||
// runOneCycle performs the following actions: | ||
// 1. connect (try until succeeds). | ||
// 2. send first status report. | ||
// 3. receive and process messages until error happens. | ||
// 1. connect (try until succeeds). | ||
// 2. send first status report. | ||
// 3. receive and process messages until error happens. | ||
// | ||
// If it encounters an error it closes the connection and returns. | ||
// Will stop and return if Stop() is called (ctx is cancelled, isStopping is set). | ||
func (c *wsClient) runOneCycle(ctx context.Context) { | ||
|
@@ -220,6 +275,17 @@ func (c *wsClient) runOneCycle(ctx context.Context) { | |
// Create a cancellable context for background processors. | ||
procCtx, procCancel := context.WithCancel(ctx) | ||
|
||
// Stop processors if we receive a signal to do so. | ||
go func() { | ||
select { | ||
case <-c.stopProcessors: | ||
procCancel() | ||
c.sender.WaitToStop() | ||
close(c.processorsStopped) | ||
case <-procCtx.Done(): | ||
} | ||
}() | ||
|
||
// Connected successfully. Start the sender. This will also send the first | ||
// status report. | ||
if err := c.sender.Start(procCtx, c.conn); err != nil { | ||
|
@@ -242,26 +308,29 @@ func (c *wsClient) runOneCycle(ctx context.Context) { | |
) | ||
r.ReceiverLoop(ctx) | ||
|
||
// If we exited receiverLoop it means there is a connection error or the connection | ||
// has closed. We cannot read messages anymore, so clean up the connection. | ||
// If there is a connection error we will need to start over. | ||
|
||
// Stop the background processors. | ||
procCancel() | ||
|
||
// If we exited receiverLoop it means there is a connection error, we cannot | ||
// read messages anymore. We need to start over. | ||
// Wait for WSSender to stop. | ||
c.sender.WaitToStop() | ||
|
||
// Close the connection to unblock the WSSender as well. | ||
// Close the connection. | ||
_ = c.conn.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure about the swapping of the order of "close" vs "wait". The comment "Close the connection to unblock the WSSender as well" seems to indicate closing is necessary to ensure wait complete. Is that the case or the comment was misleading? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may have misunderstood it, but to me the comment was misleading. Closing the connection doesn't appear to have any effect on the WSSender since the only way to shut down WSSender is by canceling the context, which is done by calling I mostly reversed these for two reasons:
|
||
|
||
// Wait for WSSender to stop. | ||
c.sender.WaitToStop() | ||
// Unset the closed connection to indicate that it is closed. | ||
c.conn = nil | ||
} | ||
|
||
func (c *wsClient) runUntilStopped(ctx context.Context) { | ||
// Iterates until we detect that the client is stopping. | ||
for { | ||
if c.common.IsStopping() { | ||
if c.common.IsStopping() || c.isStopped.Load() { | ||
return | ||
} | ||
|
||
c.runOneCycle(ctx) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document these fields.