diff --git a/pkg/eventstreams/websockets.go b/pkg/eventstreams/websockets.go index beee7df..e4ba31c 100644 --- a/pkg/eventstreams/websockets.go +++ b/pkg/eventstreams/websockets.go @@ -80,7 +80,8 @@ func (w *webSocketAction[DT]) AttemptDispatch(ctx context.Context, attempt int, isBroadcast := *w.spec.DistributionMode == DistributionModeBroadcast if isBroadcast { - return w.wsProtocol.Broadcast(ctx, w.topic, batch) + w.wsProtocol.Broadcast(ctx, w.topic, batch) + return nil } _, err = w.wsProtocol.RoundTrip(ctx, w.topic, batch) diff --git a/pkg/wsserver/wsserver.go b/pkg/wsserver/wsserver.go index a71e9d5..ff8b8c2 100644 --- a/pkg/wsserver/wsserver.go +++ b/pkg/wsserver/wsserver.go @@ -38,7 +38,7 @@ import ( // and attempted to solve the above problem set in a different way that had a challenging timing issue. type Protocol interface { // Broadcast performs best-effort delivery to all connections currently active on the specified stream - Broadcast(ctx context.Context, stream string, payload interface{}) error + Broadcast(ctx context.Context, stream string, payload interface{}) // NextRoundTrip blocks until at least one connection is started on the stream, and then // returns an interface that can be used to send a payload to exactly one of the attached @@ -132,7 +132,7 @@ func (s *webSocketServer) Close() { } } -func (s *webSocketServer) Broadcast(ctx context.Context, stream string, payload interface{}) error { +func (s *webSocketServer) Broadcast(ctx context.Context, stream string, payload interface{}) { s.mux.Lock() ss := s.streamMap[stream] if ss == nil { @@ -150,8 +150,6 @@ func (s *webSocketServer) Broadcast(ctx context.Context, stream string, payload log.L(ctx).Warnf("broadcast failed to closing connection '%s'", c.id) } } - - return nil } type roundTrip struct {