From 518f570a5589649db4d3f869c70c591b78f2428f Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Sat, 21 Dec 2024 15:32:09 +0200 Subject: [PATCH] fix pub/sub and recovery sync deadlock (#439) --- broker_memory.go | 2 +- broker_redis.go | 31 +++++++- broker_redis_test.go | 169 +++++++++++++++++++++++++++++++++++++++++++ client.go | 17 ++++- 4 files changed, 213 insertions(+), 6 deletions(-) diff --git a/broker_memory.go b/broker_memory.go index 9de79ad1..aa4a6f1a 100644 --- a/broker_memory.go +++ b/broker_memory.go @@ -492,4 +492,4 @@ func (h *historyHub) remove(ch string) error { stream.Clear() } return nil -} +} \ No newline at end of file diff --git a/broker_redis.go b/broker_redis.go index 374dbfd0..4c5ad71f 100644 --- a/broker_redis.go +++ b/broker_redis.go @@ -386,7 +386,7 @@ func (b *RedisBroker) runControlPubSub(s *RedisShard, eventHandler BrokerEventHa numProcessors := runtime.NumCPU() // Run workers to spread message processing work over worker goroutines. - workCh := make(chan rueidis.PubSubMessage) + workCh := make(chan rueidis.PubSubMessage, controlPubSubProcessorBufferSize) for i := 0; i < numProcessors; i++ { go func() { for { @@ -408,6 +408,13 @@ func (b *RedisBroker) runControlPubSub(s *RedisShard, eventHandler BrokerEventHa select { case workCh <- msg: case <-done: + default: + // Buffer is full, drop the message. It's expected that PUB/SUB layer + // only provides at most once delivery guarantee. + // Blocking here will block Redis connection read loop which is not a + // good thing and can lead to slower command processing and potentially + // to deadlocks (see https://github.com/redis/rueidis/issues/596). + // TODO: add metric here. } }, }) @@ -426,10 +433,16 @@ func (b *RedisBroker) runControlPubSub(s *RedisShard, eventHandler BrokerEventHa if err != nil { b.node.Log(NewLogEntry(LogLevelError, "control pub/sub error", map[string]any{"error": err.Error()})) } + case <-done: case <-s.closeCh: } } +const ( + pubSubProcessorBufferSize = 4096 + controlPubSubProcessorBufferSize = 4096 +) + func (b *RedisBroker) runPubSub(s *shardWrapper, eventHandler BrokerEventHandler, clusterShardIndex, psShardIndex int, useShardedPubSub bool, startOnce func(error)) { numProcessors := b.config.numPubSubProcessors numSubscribers := b.config.numPubSubSubscribers @@ -460,7 +473,7 @@ func (b *RedisBroker) runPubSub(s *shardWrapper, eventHandler BrokerEventHandler // Run PUB/SUB message processors to spread received message processing work over worker goroutines. processors := make(map[int]chan rueidis.PubSubMessage) for i := 0; i < numProcessors; i++ { - processingCh := make(chan rueidis.PubSubMessage) + processingCh := make(chan rueidis.PubSubMessage, pubSubProcessorBufferSize) processors[i] = processingCh go func(ch chan rueidis.PubSubMessage) { for { @@ -489,6 +502,15 @@ func (b *RedisBroker) runPubSub(s *shardWrapper, eventHandler BrokerEventHandler select { case processors[index(msg.Channel, numProcessors)] <- msg: case <-done: + default: + // Buffer is full, drop the message. It's expected that PUB/SUB layer + // only provides at most once delivery guarantee. + // Centrifuge has offset check mechanism to handle possible message loss + // for channels where positioning is enabled. + // Blocking here will block Redis connection read loop which is not a + // good thing and can lead to slower command processing and potentially + // to deadlocks (see https://github.com/redis/rueidis/issues/596). + // TODO: add metric here. } }, OnSubscription: func(s rueidis.PubSubSubscription) { @@ -590,11 +612,12 @@ func (b *RedisBroker) runPubSub(s *shardWrapper, eventHandler BrokerEventHandler }() select { - case err := <-wait: + case err = <-wait: startOnce(err) if err != nil { b.node.Log(NewLogEntry(LogLevelError, "pub/sub error", map[string]any{"error": err.Error()})) } + case <-done: case <-s.shard.closeCh: } } @@ -1458,4 +1481,4 @@ func parseDeltaPush(input string) (deltaPublicationPush, error) { PayloadLength: payloadLength, Payload: payload, }, nil -} +} \ No newline at end of file diff --git a/broker_redis_test.go b/broker_redis_test.go index 33eb5e7e..3cd89e58 100644 --- a/broker_redis_test.go +++ b/broker_redis_test.go @@ -2063,3 +2063,172 @@ func TestParseDeltaPush(t *testing.T) { }) } } + +// See https://github.com/centrifugal/centrifugo/issues/925. +// If there is a deadlock – test will hang. +func TestRedisClientSubscribeRecoveryServerSubs(t *testing.T) { + isInTest = true + doneCh := make(chan struct{}) + defer close(doneCh) + node := nodeWithRedisBroker(t, true, false, 6379) + defer func() { _ = node.Shutdown(context.Background()) }() + defer stopRedisBroker(node.broker.(*RedisBroker)) + + channel1 := testChannelRedisClientSubscribeRecoveryDeadlock1 + channel2 := testChannelRedisClientSubscribeRecoveryDeadlock2 + + for _, ch := range []string{channel1, channel2} { + go func(channel string) { + i := 0 + for { + _, err := node.Publish(channel, []byte(`{"n": `+strconv.Itoa(i)+`}`), WithHistory(1000, time.Second)) + if err != nil { + if !strings.Contains(err.Error(), "rueidis client is closing") { + require.NoError(t, err) + } + return + } + time.Sleep(10 * time.Millisecond) + i++ + } + }(ch) + } + + node.OnConnecting(func(ctx context.Context, event ConnectEvent) (ConnectReply, error) { + return ConnectReply{ + Subscriptions: map[string]SubscribeOptions{ + channel1: {EnableRecovery: true}, + channel2: {EnableRecovery: true}, + }, + }, nil + }) + + node.OnConnect(func(client *Client) { + client.OnSubscribe(func(event SubscribeEvent, callback SubscribeCallback) { + callback(SubscribeReply{ + Options: SubscribeOptions{EnableRecovery: true}, + }, nil) + }) + }) + + time.Sleep(10 * time.Millisecond) + + var wg sync.WaitGroup + + for i := 0; i < 1; i++ { + wg.Add(1) + go func() { + defer wg.Done() + client := newTestClient(t, node, "42") + rwWrapper := testReplyWriterWrapper() + _, err := client.connectCmd(&protocol.ConnectRequest{ + Subs: map[string]*protocol.SubscribeRequest{}, + }, &protocol.Command{}, time.Now(), rwWrapper.rw) + require.NoError(t, err) + require.Nil(t, rwWrapper.replies[0].Error) + require.True(t, client.authenticated) + _ = extractConnectReply(rwWrapper.replies) + client.triggerConnect() + client.scheduleOnConnectTimers() + }() + } + + waitGroupWithTimeout(t, &wg, 5*time.Second) +} + +func waitGroupWithTimeout(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { + c := make(chan struct{}) + go func() { + wg.Wait() + close(c) + }() + select { + case <-c: + case <-time.After(timeout): + require.Fail(t, "timeout") + } +} + +// Similar to TestRedisClientSubscribeRecoveryServerSubs test, but uses client-side subscriptions. +func TestRedisClientSubscribeRecoveryClientSubs(t *testing.T) { + doneCh := make(chan struct{}) + defer close(doneCh) + node := nodeWithRedisBroker(t, true, false, 6379) + defer func() { _ = node.Shutdown(context.Background()) }() + defer stopRedisBroker(node.broker.(*RedisBroker)) + + channel1 := "TestRedisClientSubscribeRecovery1" + channel2 := "TestRedisClientSubscribeRecovery2" + + for _, channel := range []string{channel1, channel2} { + go func(channel string) { + i := 0 + for { + _, err := node.Publish(channel, []byte(`{"n": `+strconv.Itoa(i)+`}`), WithHistory(1000, time.Second)) + if err != nil { + if !strings.Contains(err.Error(), "rueidis client is closing") { + require.NoError(t, err) + } + return + } + i++ + } + }(channel) + } + + node.OnConnect(func(client *Client) { + client.OnSubscribe(func(event SubscribeEvent, callback SubscribeCallback) { + callback(SubscribeReply{ + Options: SubscribeOptions{EnableRecovery: true}, + }, nil) + }) + }) + + time.Sleep(10 * time.Millisecond) + + var wg sync.WaitGroup + + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + client := newTestClient(t, node, "42") + connectClientV2(t, client) + rwWrapper := testReplyWriterWrapper() + err := client.handleSubscribe(&protocol.SubscribeRequest{ + Channel: channel1, + Recover: true, + Epoch: "", + }, &protocol.Command{}, time.Now(), rwWrapper.rw) + require.NoError(t, err) + require.Equal(t, 1, len(rwWrapper.replies)) + require.Nil(t, rwWrapper.replies[0].Error) + res := extractSubscribeResult(rwWrapper.replies) + require.Empty(t, res.Offset) + require.NotZero(t, res.Epoch) + require.True(t, res.Recovered) + + err = client.handleUnsubscribe(&protocol.UnsubscribeRequest{ + Channel: channel1, + }, &protocol.Command{}, time.Now(), rwWrapper.rw) + require.NoError(t, err) + require.Equal(t, 2, len(rwWrapper.replies)) + require.Nil(t, rwWrapper.replies[0].Error) + + err = client.handleSubscribe(&protocol.SubscribeRequest{ + Channel: channel1, + Recover: true, + Epoch: "", + }, &protocol.Command{}, time.Now(), rwWrapper.rw) + require.NoError(t, err) + require.Equal(t, 3, len(rwWrapper.replies)) + require.Nil(t, rwWrapper.replies[0].Error) + res = extractSubscribeResult(rwWrapper.replies) + require.Empty(t, res.Offset) + require.NotZero(t, res.Epoch) + require.True(t, res.Recovered) + }() + } + + wg.Wait() +} diff --git a/client.go b/client.go index fa784aa2..ac29c510 100644 --- a/client.go +++ b/client.go @@ -2228,6 +2228,15 @@ func (c *Client) unlockServerSideSubscriptions(subCtxMap map[string]subscribeCon } } +// isInTest may be true during Centrifuge test run. We use it to inject code required to +// cover various edge case scenarios. +var isInTest = false + +const ( + testChannelRedisClientSubscribeRecoveryDeadlock1 = "TestRedisClientSubscribeRecoveryDeadlock1" + testChannelRedisClientSubscribeRecoveryDeadlock2 = "TestRedisClientSubscribeRecoveryDeadlock2" +) + // connectCmd handles connect command from client - client must send connect // command immediately after establishing connection with server. func (c *Client) connectCmd(req *protocol.ConnectRequest, cmd *protocol.Command, started time.Time, rw *replyWriter) (*protocol.ConnectResult, error) { @@ -2438,6 +2447,12 @@ func (c *Client) connectCmd(req *protocol.ConnectRequest, cmd *protocol.Command, subCmd.Offset = subReq.Offset subCmd.Epoch = subReq.Epoch } + if isInTest && ch == testChannelRedisClientSubscribeRecoveryDeadlock2 { // Only for tests. + select { + case <-time.After(time.Second): + case <-c.Context().Done(): + } + } subCtx := c.subscribeCmd(subCmd, SubscribeReply{Options: opts}, nil, true, started, nil) subMu.Lock() subs[ch] = subCtx.result @@ -3506,4 +3521,4 @@ func toClientErr(err error) *Error { return clientErr } return ErrorInternal -} +} \ No newline at end of file