Skip to content

Commit

Permalink
graceful shutdown of the websocket client
Browse files Browse the repository at this point in the history
  • Loading branch information
haoqixu committed Nov 21, 2023
1 parent f0e220d commit 46b92c2
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 29 deletions.
26 changes: 23 additions & 3 deletions client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type wsReceiver struct {
sender *WSSender
callbacks types.Callbacks
processor receivedProcessor

// Indicates that the receiver has fully stopped.
stopped chan struct{}
err error
}

// NewWSReceiver creates a new Receiver that uses WebSocket to receive
Expand All @@ -36,14 +40,28 @@ func NewWSReceiver(
sender: sender,
callbacks: callbacks,
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities),
stopped: make(chan struct{}),
}

return w
}

// Start starts the receiver loop. To stop the receiver cancel the context.
func (r *wsReceiver) Start(ctx context.Context) {
go r.ReceiverLoop(ctx)
}

func (r *wsReceiver) IsStopped() <-chan struct{} {
return r.stopped
}

func (r *wsReceiver) Err() error {
return r.err

Check warning on line 59 in client/internal/wsreceiver.go

View check run for this annotation

Codecov / codecov/patch

client/internal/wsreceiver.go#L58-L59

Added lines #L58 - L59 were not covered by tests
}

// ReceiverLoop runs the receiver loop. To stop the receiver cancel the context.
func (r *wsReceiver) ReceiverLoop(ctx context.Context) {
runContext, cancelFunc := context.WithCancel(ctx)
processorCtx, stopProcessor := context.WithCancel(ctx)

out:
for {
Expand All @@ -52,13 +70,15 @@ out:
if ctx.Err() == nil && !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
r.logger.Errorf("Unexpected error while receiving: %v", err)
}
r.err = err
break out
} else {
r.processor.ProcessReceivedMessage(runContext, &message)
r.processor.ProcessReceivedMessage(processorCtx, &message)
}
}

cancelFunc()
stopProcessor()
close(r.stopped)
}

func (r *wsReceiver) receiveMessage(msg *protobufs.ServerToAgent) error {
Expand Down
27 changes: 26 additions & 1 deletion client/internal/wssender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"time"

"github.com/gorilla/websocket"
"google.golang.org/protobuf/proto"
Expand All @@ -11,13 +12,19 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

const (
defaultSendCloseMessageTimeout = 5 * time.Second
)

// WSSender implements the WebSocket client's sending portion of OpAMP protocol.
type WSSender struct {
SenderCommon
conn *websocket.Conn
logger types.Logger

// Indicates that the sender has fully stopped.
stopped chan struct{}
err error
}

// NewSender creates a new Sender that uses WebSocket to send
Expand All @@ -44,8 +51,17 @@ func (s *WSSender) Start(ctx context.Context, conn *websocket.Conn) error {

// WaitToStop blocks until the sender is stopped. To stop the sender cancel the context
// that was passed to Start().
func (s *WSSender) WaitToStop() {
func (s *WSSender) WaitToStop() error {

Check warning on line 54 in client/internal/wssender.go

View check run for this annotation

Codecov / codecov/patch

client/internal/wssender.go#L54

Added line #L54 was not covered by tests
<-s.stopped
return s.err

Check warning on line 56 in client/internal/wssender.go

View check run for this annotation

Codecov / codecov/patch

client/internal/wssender.go#L56

Added line #L56 was not covered by tests
}

func (s *WSSender) IsStopped() <-chan struct{} {
return s.stopped
}

func (s *WSSender) Err() error {
return s.err
}

func (s *WSSender) run(ctx context.Context) {
Expand All @@ -56,13 +72,22 @@ out:
s.sendNextMessage()

case <-ctx.Done():
s.err = s.sendCloseMessage()
break out
}
}

close(s.stopped)
}

func (s *WSSender) sendCloseMessage() error {
return s.conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Normal closure"),
time.Now().Add(defaultSendCloseMessageTimeout),
)
}

func (s *WSSender) sendNextMessage() error {
msgToSend := s.nextMessage.PopPending()
if msgToSend != nil && !proto.Equal(msgToSend, &protobufs.AgentToServer{}) {
Expand Down
58 changes: 33 additions & 25 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"github.com/open-telemetry/opamp-go/protobufs"
)

const (
defaultShutdownTimeout = 5 * time.Second
)

// wsClient is an OpAMP Client implementation for WebSocket transport.
// See specification: https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#websocket-transport
type wsClient struct {
Expand Down Expand Up @@ -80,15 +84,7 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro
}

func (c *wsClient) Stop(ctx context.Context) error {
// Close connection if any.
c.connMutex.RLock()
conn := c.conn
c.connMutex.RUnlock()

if conn != nil {
_ = conn.Close()
}

// stop the runner
return c.common.Stop(ctx)
}

Expand Down Expand Up @@ -209,9 +205,12 @@ func (c *wsClient) runOneCycle(ctx context.Context) {
// are being stopped.
return
}
// Close the underlying connection.
defer c.conn.Close()

if c.common.IsStopping() {
_ = c.conn.Close()
// XXX: is it necessary?
// _ = c.conn.Close()

Check warning on line 213 in client/wsclient.go

View check run for this annotation

Codecov / codecov/patch

client/wsclient.go#L212-L213

Added lines #L212 - L213 were not covered by tests
return
}

Expand All @@ -223,15 +222,14 @@ func (c *wsClient) runOneCycle(ctx context.Context) {
}

// Create a cancellable context for background processors.
procCtx, procCancel := context.WithCancel(ctx)
senderCtx, stopSender := context.WithCancel(ctx)
defer stopSender()

// Connected successfully. Start the sender. This will also send the first
// status report.
if err := c.sender.Start(procCtx, c.conn); err != nil {
if err := c.sender.Start(senderCtx, c.conn); err != nil {
c.common.Logger.Errorf("Failed to send first status report: %v", err)
// We could not send the report, the only thing we can do is start over.
_ = c.conn.Close()
procCancel()
return
}

Expand All @@ -245,19 +243,29 @@ func (c *wsClient) runOneCycle(ctx context.Context) {
c.common.PackagesStateProvider,
c.common.Capabilities,
)
r.ReceiverLoop(ctx)

// Stop the background processors.
procCancel()

// If we exited receiverLoop it means there is a connection error, we cannot
// read messages anymore. We need to start over.
r.Start(ctx)

select {
case <-c.sender.IsStopped():
// sender will send close message to initiate
if err := c.sender.Err(); err != nil && err != websocket.ErrCloseSent {
c.common.Logger.Debugf("failed to send close message, close without the handshake.")
break

Check warning on line 253 in client/wsclient.go

View check run for this annotation

Codecov / codecov/patch

client/wsclient.go#L252-L253

Added lines #L252 - L253 were not covered by tests
}

// Close the connection to unblock the WSSender as well.
_ = c.conn.Close()
c.common.Logger.Debugf("waiting for close message from server.")
select {
case <-r.IsStopped():
c.common.Logger.Debugf("shutdown handshake complete.")
case <-time.After(defaultShutdownTimeout):
c.common.Logger.Debugf("timeout waiting for close message.")
}
case <-r.IsStopped():
// If we exited receiverLoop it means there is a connection error, we cannot
// read messages anymore. We need to start over.

// Wait for WSSender to stop.
c.sender.WaitToStop()
// TODO: handle close message from server.
}
}

func (c *wsClient) runUntilStopped(ctx context.Context) {
Expand Down

0 comments on commit 46b92c2

Please sign in to comment.