Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully close websocket connections from the client #205

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 82 additions & 13 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
Expand All @@ -17,6 +18,8 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

var errStopping = errors.New("client is stopping or stopped, no more messages can be sent")

// wsClient is an OpAMP Client implementation for WebSocket transport.
// See specification: https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#websocket-transport
type wsClient struct {
Expand All @@ -35,8 +38,15 @@ type wsClient struct {

// The sender is responsible for sending portion of the OpAMP protocol.
sender *internal.WSSender

isStopped atomic.Bool

stopProcessors chan struct{}
processorsStopped chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document these fields.

}

var _ OpAMPClient = &wsClient{}

// NewWebSocket creates a new OpAMP Client that uses WebSocket transport.
func NewWebSocket(logger types.Logger) *wsClient {
if logger == nil {
Expand All @@ -45,8 +55,10 @@ func NewWebSocket(logger types.Logger) *wsClient {

sender := internal.NewSender(logger)
w := &wsClient{
common: internal.NewClientCommon(logger, sender),
sender: sender,
common: internal.NewClientCommon(logger, sender),
sender: sender,
stopProcessors: make(chan struct{}),
processorsStopped: make(chan struct{}),
}
return w
}
Expand Down Expand Up @@ -80,13 +92,40 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro
}

func (c *wsClient) Stop(ctx context.Context) error {
c.isStopped.Store(true)
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved

// Close connection if any.
c.connMutex.RLock()
conn := c.conn
c.connMutex.RUnlock()

if conn != nil {
_ = conn.Close()
// Shut down the sender and any other background processors.
c.stopProcessors <- struct{}{}
<-c.processorsStopped
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this get stuck or take a very long time? Should we check for <-ctx.Done() too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine checking for ctx.Done just to be safe, but this realistically shouldn't get stuck for a long time since it is essentially triggered by procCancel in runOneCycle.


defaultCloseHandler := conn.CloseHandler()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to call CloseHandler() concurrently with other uses of the connection? We may be using the connection (e.g. writing or reading data) in background processes at the same time? I can't tell if it is safe from the docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloseHandler only returns the current close handler, it doesn't perform any reads from or writes to the connection. Our goal here is to wrap the close handler so we can signal when the WebSocket library is done closing.

closed := make(chan struct{})

// The server should respond with a close message of its own, which will
// trigger this callback. At this point the close sequence has been
// completed and the TCP connection can be gracefully closed.
conn.SetCloseHandler(func(code int, text string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concurrency question for SetCloseHandler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SetCloseHandler only sets the close handler function, which will be called later when the client receives a close message from the server. We know that all writes have stopped once bgProcessingStopped has fired, which we wait on above in this method.

err := defaultCloseHandler(code, text)
closed <- struct{}{}
return err
})

message := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does a deadline of one second do? Is this the maximum time allowed for the message to reach our peer and to be acked? Seems too low in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the time the WebSocket client will wait for other writers that have a lock on the underlying connection to release their lock before aborting sending the control message. From what I can tell, writers will release their lock on the socket once it is written to without regard for whether the other end receives it. I'll raise it for now in case a client sends an enormous payload before shutting down, but for the average case I think roughly a second should be ample. Do you have any suggestions for what a good value might be here?


select {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to honour the ctx', so probably add a case <-ctx.Done():.

case <-time.After(3 * time.Second):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why specifically 3 seconds? Is this expected to do a network roundtrip? It seems too low in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to revisit this, but I mostly kept this at a low value since it's just for acking that the connection is shut down and I couldn't find a standard timeout to use. I agree it could be too low, do you think a higher value like 10 seconds would make sense?

_ = c.conn.Close()
case <-closed:
// runOneCycle will close the connection if the closing handshake completed,
// so there's no need to close it here.
}
}

return c.common.Stop(ctx)
Expand All @@ -97,22 +136,37 @@ func (c *wsClient) AgentDescription() *protobufs.AgentDescription {
}

func (c *wsClient) SetAgentDescription(descr *protobufs.AgentDescription) error {
if c.isStopped.Load() {
return errStopping
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this check? What happens if Stop() is called immediately after this check, is there a danger of race?

Copy link
Contributor Author

@evan-bradley evan-bradley Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this check?

This check disallows client implementations from attempting to send messages after they have called Stop.

What happens if Stop() is called immediately after this check, is there a danger of race?

There is, that's a great observation. We need to inhibit stopping for the duration of these methods to prevent the messages from being lost without an error being returned. I've updated the concurrency control mechanisms to account for this.

return c.common.SetAgentDescription(descr)
}

func (c *wsClient) SetHealth(health *protobufs.AgentHealth) error {
if c.isStopped.Load() {
return errStopping
}
return c.common.SetHealth(health)
}

func (c *wsClient) UpdateEffectiveConfig(ctx context.Context) error {
if c.isStopped.Load() {
return errStopping
}
return c.common.UpdateEffectiveConfig(ctx)
}

func (c *wsClient) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error {
if c.isStopped.Load() {
return errStopping
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
}
return c.common.SetRemoteConfigStatus(status)
}

func (c *wsClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) error {
if c.isStopped.Load() {
return errStopping
}
return c.common.SetPackageStatuses(statuses)
}

Expand Down Expand Up @@ -193,9 +247,10 @@ func (c *wsClient) ensureConnected(ctx context.Context) error {
}

// runOneCycle performs the following actions:
// 1. connect (try until succeeds).
// 2. send first status report.
// 3. receive and process messages until error happens.
// 1. connect (try until succeeds).
// 2. send first status report.
// 3. receive and process messages until error happens.
//
// If it encounters an error it closes the connection and returns.
// Will stop and return if Stop() is called (ctx is cancelled, isStopping is set).
func (c *wsClient) runOneCycle(ctx context.Context) {
Expand All @@ -220,6 +275,17 @@ func (c *wsClient) runOneCycle(ctx context.Context) {
// Create a cancellable context for background processors.
procCtx, procCancel := context.WithCancel(ctx)

// Stop processors if we receive a signal to do so.
go func() {
select {
case <-c.stopProcessors:
procCancel()
c.sender.WaitToStop()
close(c.processorsStopped)
case <-procCtx.Done():
}
}()

// Connected successfully. Start the sender. This will also send the first
// status report.
if err := c.sender.Start(procCtx, c.conn); err != nil {
Expand All @@ -242,26 +308,29 @@ func (c *wsClient) runOneCycle(ctx context.Context) {
)
r.ReceiverLoop(ctx)

// If we exited receiverLoop it means there is a connection error or the connection
// has closed. We cannot read messages anymore, so clean up the connection.
// If there is a connection error we will need to start over.

// Stop the background processors.
procCancel()

// If we exited receiverLoop it means there is a connection error, we cannot
// read messages anymore. We need to start over.
// Wait for WSSender to stop.
c.sender.WaitToStop()

// Close the connection to unblock the WSSender as well.
// Close the connection.
_ = c.conn.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the swapping of the order of "close" vs "wait". The comment "Close the connection to unblock the WSSender as well" seems to indicate closing is necessary to ensure wait complete. Is that the case or the comment was misleading?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have misunderstood it, but to me the comment was misleading. Closing the connection doesn't appear to have any effect on the WSSender since the only way to shut down WSSender is by canceling the context, which is done by calling procCancel in this function.

I mostly reversed these for two reasons:

  1. This allows us to clean up the application-level side of the connection before shutting down the transport-level component, which feels like the proper order of operations to me.
  2. In the case where we had an error on receiving (e.g. we got bad proto data) and we're looking to reset the connection, this will allow us to send any remaining messages before closing it.


// Wait for WSSender to stop.
c.sender.WaitToStop()
// Unset the closed connection to indicate that it is closed.
c.conn = nil
}

func (c *wsClient) runUntilStopped(ctx context.Context) {
// Iterates until we detect that the client is stopping.
for {
if c.common.IsStopping() {
if c.common.IsStopping() || c.isStopped.Load() {
return
}

c.runOneCycle(ctx)
}
}
Loading
Loading