From ca9f1d7686551b22d034cb3bbe4080815e31e4ca Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Wed, 20 Sep 2023 16:16:06 -0400 Subject: [PATCH 1/9] Gracefully close client websocket connections --- client/internal/nextmessage.go | 7 ++++++ client/wsclient.go | 41 +++++++++++++++++++++++++++++++--- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/client/internal/nextmessage.go b/client/internal/nextmessage.go index e79c6780..5116f680 100644 --- a/client/internal/nextmessage.go +++ b/client/internal/nextmessage.go @@ -34,6 +34,13 @@ func (s *NextMessage) Update(modifier func(msg *protobufs.AgentToServer)) { s.messageMutex.Unlock() } +// IsPending returns whether there is a pending message to be sent. +func (s *NextMessage) IsPending() bool { + s.messageMutex.Lock() + defer s.messageMutex.Unlock() + return s.messagePending +} + // PopPending returns the next message to be sent, if it is pending or nil otherwise. // Clears the "pending" flag. func (s *NextMessage) PopPending() *protobufs.AgentToServer { diff --git a/client/wsclient.go b/client/wsclient.go index d07f7bda..e0f4291b 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -85,7 +85,41 @@ func (c *wsClient) Stop(ctx context.Context) error { conn := c.conn c.connMutex.RUnlock() + ticker := time.NewTicker(50 * time.Millisecond) + + // Wait for all remaining messages to be sent. Continuing to attempt + // to send messages after calling Stop will eventually result in lost + // messages. + select { + case <-ticker.C: + if !c.sender.NextMessage().IsPending() { + break + } + case <-time.After(3 * time.Second): + break + } + + ticker.Stop() + if conn != nil { + defaultCloseHandler := conn.CloseHandler() + closed := make(chan bool) + + // The server should respond with a close message of its own, which will + // trigger this callback. At this point the close sequence has been + // completed and the TCP connection can be gracefully closed. + conn.SetCloseHandler(func(code int, text string) error { + closed <- true + return defaultCloseHandler(code, text) + }) + + message := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") + conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second)) + + select { + case <-time.After(3 * time.Second): + case <-closed: + } _ = conn.Close() } @@ -193,9 +227,10 @@ func (c *wsClient) ensureConnected(ctx context.Context) error { } // runOneCycle performs the following actions: -// 1. connect (try until succeeds). -// 2. send first status report. -// 3. receive and process messages until error happens. +// 1. connect (try until succeeds). +// 2. send first status report. +// 3. receive and process messages until error happens. +// // If it encounters an error it closes the connection and returns. // Will stop and return if Stop() is called (ctx is cancelled, isStopping is set). func (c *wsClient) runOneCycle(ctx context.Context) { From ac95ea7cb9e5865daa7f714878b20e049bddac5a Mon Sep 17 00:00:00 2001 From: Evan Bradley Date: Tue, 26 Sep 2023 16:56:31 -0400 Subject: [PATCH 2/9] Tests and polish --- client/wsclient.go | 2 + client/wsclient_test.go | 90 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/client/wsclient.go b/client/wsclient.go index e0f4291b..b5d0a0af 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -37,6 +37,8 @@ type wsClient struct { sender *internal.WSSender } +var _ OpAMPClient = &wsClient{} + // NewWebSocket creates a new OpAMP Client that uses WebSocket transport. func NewWebSocket(logger types.Logger) *wsClient { if logger == nil { diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 715b140f..a7ed6570 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -6,9 +6,11 @@ import ( "strings" "sync/atomic" "testing" + "time" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "github.com/open-telemetry/opamp-go/client/internal" @@ -177,3 +179,91 @@ func TestVerifyWSCompress(t *testing.T) { }) } } + +func TestPerformsClosingHandshake(t *testing.T) { + srv := internal.StartMockServer(t) + var wsConn *websocket.Conn + connected := make(chan bool) + closed := make(chan bool) + + srv.OnWSConnect = func(conn *websocket.Conn) { + wsConn = conn + connected <- true + } + + client := NewWebSocket(nil) + startClient(t, types.StartSettings{ + OpAMPServerURL: srv.GetHTTPTestServer().URL, + }, client) + + select { + case <-connected: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never established") + } + + require.Eventually(t, func() bool { + return client.conn != nil + }, 2*time.Second, 250*time.Millisecond) + + defHandler := wsConn.CloseHandler() + + wsConn.SetCloseHandler(func(code int, _ string) error { + require.Equal(t, websocket.CloseNormalClosure, code, "Client sent non-normal closing code") + + err := defHandler(code, "") + closed <- true + return err + }) + + client.Stop(context.Background()) + + select { + case <-closed: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never closed") + } +} + +func TestHandlesNoCloseMessageFromServer(t *testing.T) { + srv := internal.StartMockServer(t) + var wsConn *websocket.Conn + connected := make(chan bool) + closed := make(chan bool) + + srv.OnWSConnect = func(conn *websocket.Conn) { + wsConn = conn + connected <- true + } + + client := NewWebSocket(nil) + startClient(t, types.StartSettings{ + OpAMPServerURL: srv.GetHTTPTestServer().URL, + }, client) + + select { + case <-connected: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never established") + } + + require.Eventually(t, func() bool { + return client.conn != nil + }, 2*time.Second, 250*time.Millisecond) + + wsConn.SetCloseHandler(func(code int, _ string) error { + // Don't send close message + return nil + }) + + go func() { + client.Stop(context.Background()) + closed <- true + }() + + select { + case <-closed: + case <-time.After(5 * time.Second): + require.Fail(t, "Connection never closed") + } +} From c3d8a87068e94f3a287f035eef17c19eb15bafed Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Wed, 27 Sep 2023 09:24:23 -0400 Subject: [PATCH 3/9] Fix data race --- client/wsclient_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/client/wsclient_test.go b/client/wsclient_test.go index a7ed6570..ba289f54 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -203,7 +203,10 @@ func TestPerformsClosingHandshake(t *testing.T) { } require.Eventually(t, func() bool { - return client.conn != nil + client.connMutex.RLock() + conn := client.conn + client.connMutex.RUnlock() + return conn != nil }, 2*time.Second, 250*time.Millisecond) defHandler := wsConn.CloseHandler() @@ -248,7 +251,10 @@ func TestHandlesNoCloseMessageFromServer(t *testing.T) { } require.Eventually(t, func() bool { - return client.conn != nil + client.connMutex.RLock() + conn := client.conn + client.connMutex.RUnlock() + return conn != nil }, 2*time.Second, 250*time.Millisecond) wsConn.SetCloseHandler(func(code int, _ string) error { From 19338c6d842937251d8beb5eed5501b4e1b84de7 Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Fri, 29 Sep 2023 13:13:55 -0400 Subject: [PATCH 4/9] Improve stopping flow --- client/internal/nextmessage.go | 7 -- client/wsclient.go | 85 ++++++++----- client/wsclient_test.go | 212 +++++++++++++++++++++++++++++++-- 3 files changed, 260 insertions(+), 44 deletions(-) diff --git a/client/internal/nextmessage.go b/client/internal/nextmessage.go index 5116f680..e79c6780 100644 --- a/client/internal/nextmessage.go +++ b/client/internal/nextmessage.go @@ -34,13 +34,6 @@ func (s *NextMessage) Update(modifier func(msg *protobufs.AgentToServer)) { s.messageMutex.Unlock() } -// IsPending returns whether there is a pending message to be sent. -func (s *NextMessage) IsPending() bool { - s.messageMutex.Lock() - defer s.messageMutex.Unlock() - return s.messagePending -} - // PopPending returns the next message to be sent, if it is pending or nil otherwise. // Clears the "pending" flag. func (s *NextMessage) PopPending() *protobufs.AgentToServer { diff --git a/client/wsclient.go b/client/wsclient.go index b5d0a0af..814918c4 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -6,6 +6,7 @@ import ( "net/http" "net/url" "sync" + "sync/atomic" "time" "github.com/cenkalti/backoff/v4" @@ -17,6 +18,8 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" ) +var errStopping = errors.New("client is stopping or stopped, no more messages can be sent") + // wsClient is an OpAMP Client implementation for WebSocket transport. // See specification: https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#websocket-transport type wsClient struct { @@ -35,6 +38,10 @@ type wsClient struct { // The sender is responsible for sending portion of the OpAMP protocol. sender *internal.WSSender + + isStopped atomic.Bool + + stopProcessors chan struct{} } var _ OpAMPClient = &wsClient{} @@ -47,8 +54,9 @@ func NewWebSocket(logger types.Logger) *wsClient { sender := internal.NewSender(logger) w := &wsClient{ - common: internal.NewClientCommon(logger, sender), - sender: sender, + common: internal.NewClientCommon(logger, sender), + sender: sender, + stopProcessors: make(chan struct{}), } return w } @@ -82,37 +90,27 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro } func (c *wsClient) Stop(ctx context.Context) error { + c.isStopped.Store(true) + // Close connection if any. c.connMutex.RLock() conn := c.conn c.connMutex.RUnlock() - ticker := time.NewTicker(50 * time.Millisecond) - - // Wait for all remaining messages to be sent. Continuing to attempt - // to send messages after calling Stop will eventually result in lost - // messages. - select { - case <-ticker.C: - if !c.sender.NextMessage().IsPending() { - break - } - case <-time.After(3 * time.Second): - break - } - - ticker.Stop() - if conn != nil { + // Shut down the sender and any other background processors. + c.stopProcessors <- struct{}{} + defaultCloseHandler := conn.CloseHandler() - closed := make(chan bool) + closed := make(chan struct{}) // The server should respond with a close message of its own, which will // trigger this callback. At this point the close sequence has been // completed and the TCP connection can be gracefully closed. conn.SetCloseHandler(func(code int, text string) error { - closed <- true - return defaultCloseHandler(code, text) + err := defaultCloseHandler(code, text) + closed <- struct{}{} + return err }) message := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") @@ -120,9 +118,11 @@ func (c *wsClient) Stop(ctx context.Context) error { select { case <-time.After(3 * time.Second): + _ = c.conn.Close() case <-closed: + // runOneCycle will close the connection if the closing handshake completed, + // so there's no need to close it here. } - _ = conn.Close() } return c.common.Stop(ctx) @@ -133,22 +133,37 @@ func (c *wsClient) AgentDescription() *protobufs.AgentDescription { } func (c *wsClient) SetAgentDescription(descr *protobufs.AgentDescription) error { + if c.isStopped.Load() { + return errStopping + } return c.common.SetAgentDescription(descr) } func (c *wsClient) SetHealth(health *protobufs.AgentHealth) error { + if c.isStopped.Load() { + return errStopping + } return c.common.SetHealth(health) } func (c *wsClient) UpdateEffectiveConfig(ctx context.Context) error { + if c.isStopped.Load() { + return errStopping + } return c.common.UpdateEffectiveConfig(ctx) } func (c *wsClient) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error { + if c.isStopped.Load() { + return errStopping + } return c.common.SetRemoteConfigStatus(status) } func (c *wsClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) error { + if c.isStopped.Load() { + return errStopping + } return c.common.SetPackageStatuses(statuses) } @@ -257,6 +272,15 @@ func (c *wsClient) runOneCycle(ctx context.Context) { // Create a cancellable context for background processors. procCtx, procCancel := context.WithCancel(ctx) + // Stop processors if we receive a signal to do so. + go func() { + select { + case <-c.stopProcessors: + procCancel() + case <-procCtx.Done(): + } + }() + // Connected successfully. Start the sender. This will also send the first // status report. if err := c.sender.Start(procCtx, c.conn); err != nil { @@ -279,26 +303,29 @@ func (c *wsClient) runOneCycle(ctx context.Context) { ) r.ReceiverLoop(ctx) + // If we exited receiverLoop it means there is a connection error or the connection + // has closed. We cannot read messages anymore, so clean up the connection. + // If there is a connection error we will need to start over. + // Stop the background processors. procCancel() - // If we exited receiverLoop it means there is a connection error, we cannot - // read messages anymore. We need to start over. + // Wait for WSSender to stop. + c.sender.WaitToStop() - // Close the connection to unblock the WSSender as well. + // Close the connection. _ = c.conn.Close() - // Wait for WSSender to stop. - c.sender.WaitToStop() + // Unset the closed connection to indicate that it is closed. + c.conn = nil } func (c *wsClient) runUntilStopped(ctx context.Context) { // Iterates until we detect that the client is stopping. for { - if c.common.IsStopping() { + if c.common.IsStopping() || c.isStopped.Load() { return } - c.runOneCycle(ctx) } } diff --git a/client/wsclient_test.go b/client/wsclient_test.go index ba289f54..2b802878 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "sync" "sync/atomic" "testing" "time" @@ -180,15 +181,20 @@ func TestVerifyWSCompress(t *testing.T) { } } +func TestHandlesStopBeforeStart(t *testing.T) { + client := NewWebSocket(nil) + require.Error(t, client.Stop(context.Background())) +} + func TestPerformsClosingHandshake(t *testing.T) { srv := internal.StartMockServer(t) var wsConn *websocket.Conn - connected := make(chan bool) - closed := make(chan bool) + connected := make(chan struct{}) + closed := make(chan struct{}) srv.OnWSConnect = func(conn *websocket.Conn) { wsConn = conn - connected <- true + connected <- struct{}{} } client := NewWebSocket(nil) @@ -215,7 +221,56 @@ func TestPerformsClosingHandshake(t *testing.T) { require.Equal(t, websocket.CloseNormalClosure, code, "Client sent non-normal closing code") err := defHandler(code, "") - closed <- true + closed <- struct{}{} + return err + }) + + client.Stop(context.Background()) + + select { + case <-closed: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never closed") + } +} + +func TestHandlesSlowCloseMessageFromServer(t *testing.T) { + srv := internal.StartMockServer(t) + var wsConn *websocket.Conn + connected := make(chan struct{}) + closed := make(chan struct{}) + + srv.OnWSConnect = func(conn *websocket.Conn) { + wsConn = conn + connected <- struct{}{} + } + + client := NewWebSocket(nil) + startClient(t, types.StartSettings{ + OpAMPServerURL: srv.GetHTTPTestServer().URL, + }, client) + + select { + case <-connected: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never established") + } + + require.Eventually(t, func() bool { + client.connMutex.RLock() + conn := client.conn + client.connMutex.RUnlock() + return conn != nil + }, 2*time.Second, 250*time.Millisecond) + + defHandler := wsConn.CloseHandler() + + wsConn.SetCloseHandler(func(code int, _ string) error { + require.Equal(t, websocket.CloseNormalClosure, code, "Client sent non-normal closing code") + + time.Sleep(4 * time.Second) + err := defHandler(code, "") + closed <- struct{}{} return err }) @@ -231,12 +286,12 @@ func TestPerformsClosingHandshake(t *testing.T) { func TestHandlesNoCloseMessageFromServer(t *testing.T) { srv := internal.StartMockServer(t) var wsConn *websocket.Conn - connected := make(chan bool) - closed := make(chan bool) + connected := make(chan struct{}) + closed := make(chan struct{}) srv.OnWSConnect = func(conn *websocket.Conn) { wsConn = conn - connected <- true + connected <- struct{}{} } client := NewWebSocket(nil) @@ -264,7 +319,7 @@ func TestHandlesNoCloseMessageFromServer(t *testing.T) { go func() { client.Stop(context.Background()) - closed <- true + closed <- struct{}{} }() select { @@ -273,3 +328,144 @@ func TestHandlesNoCloseMessageFromServer(t *testing.T) { require.Fail(t, "Connection never closed") } } + +func TestHandlesConnectionError(t *testing.T) { + srv := internal.StartMockServer(t) + var wsConn *websocket.Conn + connected := make(chan struct{}) + + srv.OnWSConnect = func(conn *websocket.Conn) { + wsConn = conn + connected <- struct{}{} + } + + client := NewWebSocket(nil) + startClient(t, types.StartSettings{ + OpAMPServerURL: srv.GetHTTPTestServer().URL, + }, client) + + select { + case <-connected: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never established") + } + + require.Eventually(t, func() bool { + client.connMutex.RLock() + conn := client.conn + client.connMutex.RUnlock() + return conn != nil + }, 2*time.Second, 250*time.Millisecond) + + // Write an invalid message to the connection. The client + // will take this as an error and reconnect to the server. + writer, err := wsConn.NextWriter(websocket.BinaryMessage) + require.NoError(t, err) + n, err := writer.Write([]byte{99, 1, 2, 3, 4, 5}) + require.NoError(t, err) + require.Equal(t, 6, n) + err = writer.Close() + require.NoError(t, err) + + select { + case <-connected: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never re-established") + } + + require.Eventually(t, func() bool { + client.connMutex.RLock() + conn := client.conn + client.connMutex.RUnlock() + return conn != nil + }, 2*time.Second, 250*time.Millisecond) + + err = client.Stop(context.Background()) + require.NoError(t, err) +} + +func TestDisallowsSendingAfterStopped(t *testing.T) { + srv := internal.StartMockServer(t) + var wsConn *websocket.Conn + connected := make(chan struct{}) + closed := make(chan struct{}) + + srv.OnWSConnect = func(conn *websocket.Conn) { + wsConn = conn + connected <- struct{}{} + } + + client := NewWebSocket(nil) + startClient(t, types.StartSettings{ + OpAMPServerURL: srv.GetHTTPTestServer().URL, + }, client) + + select { + case <-connected: + case <-time.After(2 * time.Second): + require.Fail(t, "Connection never established") + } + + require.Eventually(t, func() bool { + client.connMutex.RLock() + conn := client.conn + client.connMutex.RUnlock() + return conn != nil + }, 2*time.Second, 250*time.Millisecond) + + wg := sync.WaitGroup{} + send := make(chan struct{}) + + defHandler := wsConn.CloseHandler() + wsConn.SetCloseHandler(func(code int, _ string) error { + close(send) + // Pause the stopping process to ensure that sends are disallowed while the client + // is stopping, not necessarily just after it has stopped. + wg.Wait() + err := defHandler(code, "") + closed <- struct{}{} + return err + }) + + wg.Add(5) + go func() { + err := client.Stop(context.Background()) + require.NoError(t, err) + }() + go func() { + <-send + err := client.SetAgentDescription(&protobufs.AgentDescription{}) + require.Error(t, err) + wg.Done() + }() + go func() { + <-send + err := client.SetHealth(&protobufs.AgentHealth{}) + require.Error(t, err) + wg.Done() + }() + go func() { + <-send + err := client.UpdateEffectiveConfig(context.Background()) + require.Error(t, err) + wg.Done() + }() + go func() { + <-send + err := client.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{}) + require.Error(t, err) + wg.Done() + }() + go func() { + <-send + err := client.SetPackageStatuses(&protobufs.PackageStatuses{}) + require.Error(t, err) + wg.Done() + }() + + select { + case <-closed: + case <-time.After(5 * time.Second): + t.Error("Connection failed to close") + } +} From 1986e07d58767b588d8d0b995758d2879dfc9fad Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Fri, 29 Sep 2023 13:57:29 -0400 Subject: [PATCH 5/9] Two-way communication between Stop and runOneCycle --- client/wsclient.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index 814918c4..c8a5ace8 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -41,7 +41,8 @@ type wsClient struct { isStopped atomic.Bool - stopProcessors chan struct{} + stopProcessors chan struct{} + processorsStopped chan struct{} } var _ OpAMPClient = &wsClient{} @@ -54,9 +55,10 @@ func NewWebSocket(logger types.Logger) *wsClient { sender := internal.NewSender(logger) w := &wsClient{ - common: internal.NewClientCommon(logger, sender), - sender: sender, - stopProcessors: make(chan struct{}), + common: internal.NewClientCommon(logger, sender), + sender: sender, + stopProcessors: make(chan struct{}), + processorsStopped: make(chan struct{}), } return w } @@ -100,6 +102,7 @@ func (c *wsClient) Stop(ctx context.Context) error { if conn != nil { // Shut down the sender and any other background processors. c.stopProcessors <- struct{}{} + <-c.processorsStopped defaultCloseHandler := conn.CloseHandler() closed := make(chan struct{}) @@ -277,6 +280,8 @@ func (c *wsClient) runOneCycle(ctx context.Context) { select { case <-c.stopProcessors: procCancel() + c.sender.WaitToStop() + close(c.processorsStopped) case <-procCtx.Done(): } }() From dcb41503d66900c41535bd71919399b45120fade Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Fri, 29 Sep 2023 14:14:38 -0400 Subject: [PATCH 6/9] Fix failing test --- server/serverimpl_test.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/server/serverimpl_test.go b/server/serverimpl_test.go index 04827d38..b6dec72c 100644 --- a/server/serverimpl_test.go +++ b/server/serverimpl_test.go @@ -751,20 +751,12 @@ func TestConnectionAllowsConcurrentWrites(t *testing.T) { defer conn.Close() - timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second) - - select { - case <-timeout.Done(): - t.Error("Client failed to connect before timeout") - default: - if _, ok := srvConnVal.Load().(types.Connection); ok == true { - break - } - } - - cancel() + require.Eventually(t, func() bool { + return srvConnVal.Load() != nil + }, 2*time.Second, 250*time.Millisecond) - srvConn := srvConnVal.Load().(types.Connection) + srvConn, ok := srvConnVal.Load().(types.Connection) + require.True(t, ok, "The server connection is not a types.Connection") for i := 0; i < 20; i++ { go func() { defer func() { From 8d83d237fe6e252e96e24ca9cf053521d002410c Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Mon, 2 Oct 2023 14:20:50 -0400 Subject: [PATCH 7/9] Address PR feedback --- client/wsclient.go | 67 +++++++++++++++++++++++++++++------------ client/wsclient_test.go | 8 +++-- 2 files changed, 52 insertions(+), 23 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index c8a5ace8..86dd9733 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -3,6 +3,7 @@ package client import ( "context" "errors" + "fmt" "net/http" "net/url" "sync" @@ -19,6 +20,7 @@ import ( ) var errStopping = errors.New("client is stopping or stopped, no more messages can be sent") +var errEarlyStop = errors.New("context canceled before shutdown could complete") // wsClient is an OpAMP Client implementation for WebSocket transport. // See specification: https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#websocket-transport @@ -41,8 +43,17 @@ type wsClient struct { isStopped atomic.Bool - stopProcessors chan struct{} + // Sends a signal to stop background processors that asynchronously use the + // WebSocket connection, e.g. WSSender. + stopProcessors chan struct{} + + // Responds to a signal from stopProcessors indicating that all processors + // have been stopped. processorsStopped chan struct{} + + // Network connection timeout used for the WebSocket closing handshake. + // This field is currently only modified during testing. + connShutdownTimeout time.Duration } var _ OpAMPClient = &wsClient{} @@ -55,10 +66,11 @@ func NewWebSocket(logger types.Logger) *wsClient { sender := internal.NewSender(logger) w := &wsClient{ - common: internal.NewClientCommon(logger, sender), - sender: sender, - stopProcessors: make(chan struct{}), - processorsStopped: make(chan struct{}), + common: internal.NewClientCommon(logger, sender), + sender: sender, + stopProcessors: make(chan struct{}, 1), + processorsStopped: make(chan struct{}, 1), + connShutdownTimeout: 10 * time.Second, } return w } @@ -102,7 +114,12 @@ func (c *wsClient) Stop(ctx context.Context) error { if conn != nil { // Shut down the sender and any other background processors. c.stopProcessors <- struct{}{} - <-c.processorsStopped + select { + case <-c.processorsStopped: + case <-ctx.Done(): + _ = c.conn.Close() + return errEarlyStop + } defaultCloseHandler := conn.CloseHandler() closed := make(chan struct{}) @@ -117,14 +134,22 @@ func (c *wsClient) Stop(ctx context.Context) error { }) message := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") - conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second)) + err := conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(c.connShutdownTimeout)) - select { - case <-time.After(3 * time.Second): + if err != nil { _ = c.conn.Close() + return fmt.Errorf("could not write close message to WebSocket, connection closed without performing closing handshake: %w", err) + } + + select { case <-closed: // runOneCycle will close the connection if the closing handshake completed, // so there's no need to close it here. + case <-time.After(c.connShutdownTimeout): + _ = c.conn.Close() + case <-ctx.Done(): + _ = c.conn.Close() + return errEarlyStop } } @@ -260,8 +285,17 @@ func (c *wsClient) runOneCycle(ctx context.Context) { return } - if c.common.IsStopping() { + defer func() { + // Close the connection. _ = c.conn.Close() + + // Unset the closed connection to indicate that it is closed. + c.connMutex.Lock() + c.conn = nil + c.connMutex.Unlock() + }() + + if c.common.IsStopping() { return } @@ -281,7 +315,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { case <-c.stopProcessors: procCancel() c.sender.WaitToStop() - close(c.processorsStopped) + c.processorsStopped <- struct{}{} case <-procCtx.Done(): } }() @@ -291,7 +325,6 @@ func (c *wsClient) runOneCycle(ctx context.Context) { if err := c.sender.Start(procCtx, c.conn); err != nil { c.common.Logger.Errorf("Failed to send first status report: %v", err) // We could not send the report, the only thing we can do is start over. - _ = c.conn.Close() procCancel() return } @@ -308,8 +341,8 @@ func (c *wsClient) runOneCycle(ctx context.Context) { ) r.ReceiverLoop(ctx) - // If we exited receiverLoop it means there is a connection error or the connection - // has closed. We cannot read messages anymore, so clean up the connection. + // If we exited receiverLoop it means there is a connection error or the closing handshake + // has completed. We cannot read messages anymore, so clean up the connection. // If there is a connection error we will need to start over. // Stop the background processors. @@ -317,12 +350,6 @@ func (c *wsClient) runOneCycle(ctx context.Context) { // Wait for WSSender to stop. c.sender.WaitToStop() - - // Close the connection. - _ = c.conn.Close() - - // Unset the closed connection to indicate that it is closed. - c.conn = nil } func (c *wsClient) runUntilStopped(ctx context.Context) { diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 2b802878..b7bd48da 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -246,6 +246,7 @@ func TestHandlesSlowCloseMessageFromServer(t *testing.T) { } client := NewWebSocket(nil) + client.connShutdownTimeout = 100 * time.Millisecond startClient(t, types.StartSettings{ OpAMPServerURL: srv.GetHTTPTestServer().URL, }, client) @@ -268,7 +269,7 @@ func TestHandlesSlowCloseMessageFromServer(t *testing.T) { wsConn.SetCloseHandler(func(code int, _ string) error { require.Equal(t, websocket.CloseNormalClosure, code, "Client sent non-normal closing code") - time.Sleep(4 * time.Second) + time.Sleep(200 * time.Millisecond) err := defHandler(code, "") closed <- struct{}{} return err @@ -278,7 +279,7 @@ func TestHandlesSlowCloseMessageFromServer(t *testing.T) { select { case <-closed: - case <-time.After(2 * time.Second): + case <-time.After(1 * time.Second): require.Fail(t, "Connection never closed") } } @@ -295,6 +296,7 @@ func TestHandlesNoCloseMessageFromServer(t *testing.T) { } client := NewWebSocket(nil) + client.connShutdownTimeout = 100 * time.Millisecond startClient(t, types.StartSettings{ OpAMPServerURL: srv.GetHTTPTestServer().URL, }, client) @@ -324,7 +326,7 @@ func TestHandlesNoCloseMessageFromServer(t *testing.T) { select { case <-closed: - case <-time.After(5 * time.Second): + case <-time.After(1 * time.Second): require.Fail(t, "Connection never closed") } } From b8b75cda835a7862c3770963f8919ab31ded05fa Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Tue, 3 Oct 2023 11:17:36 -0400 Subject: [PATCH 8/9] Address PR feedback --- client/wsclient.go | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index 86dd9733..58c8fe66 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -45,11 +45,11 @@ type wsClient struct { // Sends a signal to stop background processors that asynchronously use the // WebSocket connection, e.g. WSSender. - stopProcessors chan struct{} + stopBGProcessors chan struct{} // Responds to a signal from stopProcessors indicating that all processors // have been stopped. - processorsStopped chan struct{} + bgProcessorsStopped chan struct{} // Network connection timeout used for the WebSocket closing handshake. // This field is currently only modified during testing. @@ -68,8 +68,8 @@ func NewWebSocket(logger types.Logger) *wsClient { w := &wsClient{ common: internal.NewClientCommon(logger, sender), sender: sender, - stopProcessors: make(chan struct{}, 1), - processorsStopped: make(chan struct{}, 1), + stopBGProcessors: make(chan struct{}, 1), + bgProcessorsStopped: make(chan struct{}, 1), connShutdownTimeout: 10 * time.Second, } return w @@ -113,9 +113,9 @@ func (c *wsClient) Stop(ctx context.Context) error { if conn != nil { // Shut down the sender and any other background processors. - c.stopProcessors <- struct{}{} + c.stopBGProcessors <- struct{}{} select { - case <-c.processorsStopped: + case <-c.bgProcessorsStopped: case <-ctx.Done(): _ = c.conn.Close() return errEarlyStop @@ -273,11 +273,12 @@ func (c *wsClient) ensureConnected(ctx context.Context) error { // runOneCycle performs the following actions: // 1. connect (try until succeeds). -// 2. send first status report. -// 3. receive and process messages until error happens. +// 2. set up a background processor to send messages. +// 3. send first status report. +// 4. receive and process messages until an error occurs or the connection closes. // // If it encounters an error it closes the connection and returns. -// Will stop and return if Stop() is called (ctx is cancelled, isStopping is set). +// Will stop and return if Stop() is called. func (c *wsClient) runOneCycle(ctx context.Context) { if err := c.ensureConnected(ctx); err != nil { // Can't connect, so can't move forward. This currently happens when we @@ -309,13 +310,15 @@ func (c *wsClient) runOneCycle(ctx context.Context) { // Create a cancellable context for background processors. procCtx, procCancel := context.WithCancel(ctx) - // Stop processors if we receive a signal to do so. + // Stop background processors if we receive a signal to do so. + // Note that the receiver does not respond to signals and + // will only stop when the connection closes or errors. go func() { select { - case <-c.stopProcessors: + case <-c.stopBGProcessors: procCancel() c.sender.WaitToStop() - c.processorsStopped <- struct{}{} + c.bgProcessorsStopped <- struct{}{} case <-procCtx.Done(): } }() From 63f591c09ffe69ecb924cd396405a1ba42547834 Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Fri, 13 Oct 2023 14:49:15 -0400 Subject: [PATCH 9/9] Improve concurrency control --- client/wsclient.go | 106 ++++++++++++++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 35 deletions(-) diff --git a/client/wsclient.go b/client/wsclient.go index 58c8fe66..7ace7697 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -7,7 +7,6 @@ import ( "net/http" "net/url" "sync" - "sync/atomic" "time" "github.com/cenkalti/backoff/v4" @@ -41,15 +40,20 @@ type wsClient struct { // The sender is responsible for sending portion of the OpAMP protocol. sender *internal.WSSender - isStopped atomic.Bool - - // Sends a signal to stop background processors that asynchronously use the - // WebSocket connection, e.g. WSSender. - stopBGProcessors chan struct{} - - // Responds to a signal from stopProcessors indicating that all processors + // Indicates whether the client is open for more messages to be sent. + // Should be protected by connectionOpenMutex. + connectionOpen bool + // Indicates the connection is being written to. + // A read lock on this mutex indicates that a message is being queued for writing. + // A write lock on this mutex indicates that the connection is being shut down. + connectionOpenMutex sync.RWMutex + + // Sends a signal to the background processors controller thread to stop + // all background processors. + stopBGProcessing chan struct{} + // Responds to a signal from stopBGProcessing indicating that all processors // have been stopped. - bgProcessorsStopped chan struct{} + bgProcessingStopped chan struct{} // Network connection timeout used for the WebSocket closing handshake. // This field is currently only modified during testing. @@ -68,8 +72,9 @@ func NewWebSocket(logger types.Logger) *wsClient { w := &wsClient{ common: internal.NewClientCommon(logger, sender), sender: sender, - stopBGProcessors: make(chan struct{}, 1), - bgProcessorsStopped: make(chan struct{}, 1), + connectionOpen: true, + stopBGProcessing: make(chan struct{}, 1), + bgProcessingStopped: make(chan struct{}, 1), connShutdownTimeout: 10 * time.Second, } return w @@ -104,7 +109,11 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro } func (c *wsClient) Stop(ctx context.Context) error { - c.isStopped.Store(true) + // Prevent any additional writers from writing to the connection + // and stop reconnecting if the connection closes. + c.connectionOpenMutex.Lock() + c.connectionOpen = false + c.connectionOpenMutex.Unlock() // Close connection if any. c.connMutex.RLock() @@ -113,14 +122,17 @@ func (c *wsClient) Stop(ctx context.Context) error { if conn != nil { // Shut down the sender and any other background processors. - c.stopBGProcessors <- struct{}{} + c.stopBGProcessing <- struct{}{} select { - case <-c.bgProcessorsStopped: + case <-c.bgProcessingStopped: case <-ctx.Done(): - _ = c.conn.Close() + c.closeConnection() return errEarlyStop } + // At this point all other writers to the connection should be stopped. + // We can write to the connection without any risk of contention. + defaultCloseHandler := conn.CloseHandler() closed := make(chan struct{}) @@ -133,11 +145,14 @@ func (c *wsClient) Stop(ctx context.Context) error { return err }) + // Start the closing handshake by writing a close message to the server. + // If the server responds with its own close message, the connection reader will + // shut down and there will be no more reads from or writes to the connection. message := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") err := conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(c.connShutdownTimeout)) if err != nil { - _ = c.conn.Close() + c.closeConnection() return fmt.Errorf("could not write close message to WebSocket, connection closed without performing closing handshake: %w", err) } @@ -146,9 +161,9 @@ func (c *wsClient) Stop(ctx context.Context) error { // runOneCycle will close the connection if the closing handshake completed, // so there's no need to close it here. case <-time.After(c.connShutdownTimeout): - _ = c.conn.Close() + c.closeConnection() case <-ctx.Done(): - _ = c.conn.Close() + c.closeConnection() return errEarlyStop } } @@ -161,35 +176,45 @@ func (c *wsClient) AgentDescription() *protobufs.AgentDescription { } func (c *wsClient) SetAgentDescription(descr *protobufs.AgentDescription) error { - if c.isStopped.Load() { + c.connectionOpenMutex.RLock() + defer c.connectionOpenMutex.RUnlock() + if !c.connectionOpen { return errStopping } return c.common.SetAgentDescription(descr) } func (c *wsClient) SetHealth(health *protobufs.AgentHealth) error { - if c.isStopped.Load() { + c.connectionOpenMutex.RLock() + defer c.connectionOpenMutex.RUnlock() + if !c.connectionOpen { return errStopping } return c.common.SetHealth(health) } func (c *wsClient) UpdateEffectiveConfig(ctx context.Context) error { - if c.isStopped.Load() { + c.connectionOpenMutex.RLock() + defer c.connectionOpenMutex.RUnlock() + if !c.connectionOpen { return errStopping } return c.common.UpdateEffectiveConfig(ctx) } func (c *wsClient) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatus) error { - if c.isStopped.Load() { + c.connectionOpenMutex.RLock() + defer c.connectionOpenMutex.RUnlock() + if !c.connectionOpen { return errStopping } return c.common.SetRemoteConfigStatus(status) } func (c *wsClient) SetPackageStatuses(statuses *protobufs.PackageStatuses) error { - if c.isStopped.Load() { + c.connectionOpenMutex.RLock() + defer c.connectionOpenMutex.RUnlock() + if !c.connectionOpen { return errStopping } return c.common.SetPackageStatuses(statuses) @@ -271,6 +296,21 @@ func (c *wsClient) ensureConnected(ctx context.Context) error { } } +func (c *wsClient) closeConnection() { + c.connMutex.Lock() + defer c.connMutex.Unlock() + + if c.conn == nil { + return + } + + // Close the connection. + _ = c.conn.Close() + + // Unset the field to indicate that the connection is closed. + c.conn = nil +} + // runOneCycle performs the following actions: // 1. connect (try until succeeds). // 2. set up a background processor to send messages. @@ -286,15 +326,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { return } - defer func() { - // Close the connection. - _ = c.conn.Close() - - // Unset the closed connection to indicate that it is closed. - c.connMutex.Lock() - c.conn = nil - c.connMutex.Unlock() - }() + defer c.closeConnection() if c.common.IsStopping() { return @@ -315,10 +347,10 @@ func (c *wsClient) runOneCycle(ctx context.Context) { // will only stop when the connection closes or errors. go func() { select { - case <-c.stopBGProcessors: + case <-c.stopBGProcessing: procCancel() c.sender.WaitToStop() - c.bgProcessorsStopped <- struct{}{} + close(c.bgProcessingStopped) case <-procCtx.Done(): } }() @@ -358,9 +390,13 @@ func (c *wsClient) runOneCycle(ctx context.Context) { func (c *wsClient) runUntilStopped(ctx context.Context) { // Iterates until we detect that the client is stopping. for { - if c.common.IsStopping() || c.isStopped.Load() { + c.connectionOpenMutex.RLock() + if c.common.IsStopping() || !c.connectionOpen { + c.connectionOpenMutex.RUnlock() return } + c.connectionOpenMutex.RUnlock() + c.runOneCycle(ctx) } }