From a9319657f601879e7c59a94e1381de148b76f6a6 Mon Sep 17 00:00:00 2001 From: Enrique Lacal Date: Mon, 19 Feb 2024 11:21:20 +0000 Subject: [PATCH 1/4] fix: empty stream state on broadcast Signed-off-by: Enrique Lacal --- pkg/eventstreams/websockets.go | 3 +-- pkg/wsserver/wsserver.go | 19 +++++++++++++++++-- pkg/wsserver/wsserver_test.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 4 deletions(-) diff --git a/pkg/eventstreams/websockets.go b/pkg/eventstreams/websockets.go index e4ba31c..beee7df 100644 --- a/pkg/eventstreams/websockets.go +++ b/pkg/eventstreams/websockets.go @@ -80,8 +80,7 @@ func (w *webSocketAction[DT]) AttemptDispatch(ctx context.Context, attempt int, isBroadcast := *w.spec.DistributionMode == DistributionModeBroadcast if isBroadcast { - w.wsProtocol.Broadcast(ctx, w.topic, batch) - return nil + return w.wsProtocol.Broadcast(ctx, w.topic, batch) } _, err = w.wsProtocol.RoundTrip(ctx, w.topic, batch) diff --git a/pkg/wsserver/wsserver.go b/pkg/wsserver/wsserver.go index a4bbb15..eb341a3 100644 --- a/pkg/wsserver/wsserver.go +++ b/pkg/wsserver/wsserver.go @@ -38,7 +38,7 @@ import ( // and attempted to solve the above problem set in a different way that had a challenging timing issue. type Protocol interface { // Broadcast performs best-effort delivery to all connections currently active on the specified stream - Broadcast(ctx context.Context, stream string, payload interface{}) + Broadcast(ctx context.Context, stream string, payload interface{}) error // NextRoundTrip blocks until at least one connection is started on the stream, and then // returns an interface that can be used to send a payload to exactly one of the attached @@ -132,9 +132,22 @@ func (s *webSocketServer) Close() { } } -func (s *webSocketServer) Broadcast(ctx context.Context, stream string, payload interface{}) { +func (s *webSocketServer) Broadcast(ctx context.Context, stream string, payload interface{}) error { s.mux.Lock() ss := s.streamMap[stream] + if ss == nil { + s.mux.Unlock() + // Waiting for a least one connection to start the stream + // before we start broadcasting + err := s.waitStreamConnections(ctx, stream, func(providedState *streamState) error { + ss = providedState + return nil + }) + if err != nil { + return err + } + s.mux.Lock() + } conns := make([]*webSocketConnection, len(ss.conns)) copy(conns, ss.conns) s.mux.Unlock() @@ -146,6 +159,8 @@ func (s *webSocketServer) Broadcast(ctx context.Context, stream string, payload log.L(ctx).Warnf("broadcast failed to closing connection '%s'", c.id) } } + + return nil } type roundTrip struct { diff --git a/pkg/wsserver/wsserver_test.go b/pkg/wsserver/wsserver_test.go index 358536f..14452fe 100644 --- a/pkg/wsserver/wsserver_test.go +++ b/pkg/wsserver/wsserver_test.go @@ -359,6 +359,35 @@ func TestConnectBadWebsocketHandshake(t *testing.T) { } +func TestBroadcastStartWithoutConnections(t *testing.T) { + assert := assert.New(t) + + w, ts := newTestWebSocketServer() + defer ts.Close() + defer w.Close() + + u, _ := url.Parse(ts.URL) + u.Scheme = "ws" + u.Path = "/ws" + stream := "banana" + + go w.Broadcast(w.ctx, stream, "Hello World") + + c1, _, err := ws.DefaultDialer.Dial(u.String(), nil) + assert.NoError(err) + c1.WriteJSON(&WebSocketCommandMessage{ + Type: "start", + Stream: stream, + }) + + c2, _, err := ws.DefaultDialer.Dial(u.String(), nil) + assert.NoError(err) + c2.WriteJSON(&WebSocketCommandMessage{ + Type: "start", + Stream: stream, + }) +} + func TestBroadcast(t *testing.T) { assert := assert.New(t) From 7e7632c0d435b27073486af2570a1fe482c8cdd7 Mon Sep 17 00:00:00 2001 From: Enrique Lacal Date: Mon, 19 Feb 2024 15:42:23 +0000 Subject: [PATCH 2/4] simplify check Signed-off-by: Enrique Lacal --- pkg/wsserver/wsserver.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/wsserver/wsserver.go b/pkg/wsserver/wsserver.go index eb341a3..3969d69 100644 --- a/pkg/wsserver/wsserver.go +++ b/pkg/wsserver/wsserver.go @@ -139,13 +139,11 @@ func (s *webSocketServer) Broadcast(ctx context.Context, stream string, payload s.mux.Unlock() // Waiting for a least one connection to start the stream // before we start broadcasting - err := s.waitStreamConnections(ctx, stream, func(providedState *streamState) error { + // Err always nil per callback function + _ = s.waitStreamConnections(ctx, stream, func(providedState *streamState) error { ss = providedState return nil }) - if err != nil { - return err - } s.mux.Lock() } conns := make([]*webSocketConnection, len(ss.conns)) From 5a8e0ab2d7d3923a7bc66ab8919065cad7ed0085 Mon Sep 17 00:00:00 2001 From: Enrique Lacal Date: Mon, 19 Feb 2024 16:13:19 +0000 Subject: [PATCH 3/4] Based on review comments, simplify and do not wait for connection Signed-off-by: Enrique Lacal --- pkg/wsserver/wsserver.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/pkg/wsserver/wsserver.go b/pkg/wsserver/wsserver.go index 3969d69..a71e9d5 100644 --- a/pkg/wsserver/wsserver.go +++ b/pkg/wsserver/wsserver.go @@ -136,15 +136,8 @@ func (s *webSocketServer) Broadcast(ctx context.Context, stream string, payload s.mux.Lock() ss := s.streamMap[stream] if ss == nil { - s.mux.Unlock() - // Waiting for a least one connection to start the stream - // before we start broadcasting - // Err always nil per callback function - _ = s.waitStreamConnections(ctx, stream, func(providedState *streamState) error { - ss = providedState - return nil - }) - s.mux.Lock() + ss = &streamState{} + s.streamMap[stream] = ss } conns := make([]*webSocketConnection, len(ss.conns)) copy(conns, ss.conns) From 891eea2f5973cdf77f11620cdb4535d0c8f8a28a Mon Sep 17 00:00:00 2001 From: Enrique Lacal Date: Mon, 19 Feb 2024 16:17:55 +0000 Subject: [PATCH 4/4] revert interface change Signed-off-by: Enrique Lacal --- pkg/eventstreams/websockets.go | 3 ++- pkg/wsserver/wsserver.go | 6 ++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/eventstreams/websockets.go b/pkg/eventstreams/websockets.go index beee7df..e4ba31c 100644 --- a/pkg/eventstreams/websockets.go +++ b/pkg/eventstreams/websockets.go @@ -80,7 +80,8 @@ func (w *webSocketAction[DT]) AttemptDispatch(ctx context.Context, attempt int, isBroadcast := *w.spec.DistributionMode == DistributionModeBroadcast if isBroadcast { - return w.wsProtocol.Broadcast(ctx, w.topic, batch) + w.wsProtocol.Broadcast(ctx, w.topic, batch) + return nil } _, err = w.wsProtocol.RoundTrip(ctx, w.topic, batch) diff --git a/pkg/wsserver/wsserver.go b/pkg/wsserver/wsserver.go index a71e9d5..ff8b8c2 100644 --- a/pkg/wsserver/wsserver.go +++ b/pkg/wsserver/wsserver.go @@ -38,7 +38,7 @@ import ( // and attempted to solve the above problem set in a different way that had a challenging timing issue. type Protocol interface { // Broadcast performs best-effort delivery to all connections currently active on the specified stream - Broadcast(ctx context.Context, stream string, payload interface{}) error + Broadcast(ctx context.Context, stream string, payload interface{}) // NextRoundTrip blocks until at least one connection is started on the stream, and then // returns an interface that can be used to send a payload to exactly one of the attached @@ -132,7 +132,7 @@ func (s *webSocketServer) Close() { } } -func (s *webSocketServer) Broadcast(ctx context.Context, stream string, payload interface{}) error { +func (s *webSocketServer) Broadcast(ctx context.Context, stream string, payload interface{}) { s.mux.Lock() ss := s.streamMap[stream] if ss == nil { @@ -150,8 +150,6 @@ func (s *webSocketServer) Broadcast(ctx context.Context, stream string, payload log.L(ctx).Warnf("broadcast failed to closing connection '%s'", c.id) } } - - return nil } type roundTrip struct {