Skip to content

Commit

Permalink
fix pub/sub and recovery sync deadlock (#439)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Dec 21, 2024
1 parent f3a1d6b commit 518f570
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 6 deletions.
2 changes: 1 addition & 1 deletion broker_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,4 @@ func (h *historyHub) remove(ch string) error {
stream.Clear()
}
return nil
}
}
31 changes: 27 additions & 4 deletions broker_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (b *RedisBroker) runControlPubSub(s *RedisShard, eventHandler BrokerEventHa
numProcessors := runtime.NumCPU()

// Run workers to spread message processing work over worker goroutines.
workCh := make(chan rueidis.PubSubMessage)
workCh := make(chan rueidis.PubSubMessage, controlPubSubProcessorBufferSize)
for i := 0; i < numProcessors; i++ {
go func() {
for {
Expand All @@ -408,6 +408,13 @@ func (b *RedisBroker) runControlPubSub(s *RedisShard, eventHandler BrokerEventHa
select {
case workCh <- msg:
case <-done:
default:
// Buffer is full, drop the message. It's expected that PUB/SUB layer
// only provides at most once delivery guarantee.
// Blocking here will block Redis connection read loop which is not a
// good thing and can lead to slower command processing and potentially
// to deadlocks (see https://github.com/redis/rueidis/issues/596).
// TODO: add metric here.
}
},
})
Expand All @@ -426,10 +433,16 @@ func (b *RedisBroker) runControlPubSub(s *RedisShard, eventHandler BrokerEventHa
if err != nil {
b.node.Log(NewLogEntry(LogLevelError, "control pub/sub error", map[string]any{"error": err.Error()}))
}
case <-done:
case <-s.closeCh:
}
}

const (
pubSubProcessorBufferSize = 4096
controlPubSubProcessorBufferSize = 4096
)

func (b *RedisBroker) runPubSub(s *shardWrapper, eventHandler BrokerEventHandler, clusterShardIndex, psShardIndex int, useShardedPubSub bool, startOnce func(error)) {
numProcessors := b.config.numPubSubProcessors
numSubscribers := b.config.numPubSubSubscribers
Expand Down Expand Up @@ -460,7 +473,7 @@ func (b *RedisBroker) runPubSub(s *shardWrapper, eventHandler BrokerEventHandler
// Run PUB/SUB message processors to spread received message processing work over worker goroutines.
processors := make(map[int]chan rueidis.PubSubMessage)
for i := 0; i < numProcessors; i++ {
processingCh := make(chan rueidis.PubSubMessage)
processingCh := make(chan rueidis.PubSubMessage, pubSubProcessorBufferSize)
processors[i] = processingCh
go func(ch chan rueidis.PubSubMessage) {
for {
Expand Down Expand Up @@ -489,6 +502,15 @@ func (b *RedisBroker) runPubSub(s *shardWrapper, eventHandler BrokerEventHandler
select {
case processors[index(msg.Channel, numProcessors)] <- msg:
case <-done:
default:
// Buffer is full, drop the message. It's expected that PUB/SUB layer
// only provides at most once delivery guarantee.
// Centrifuge has offset check mechanism to handle possible message loss
// for channels where positioning is enabled.
// Blocking here will block Redis connection read loop which is not a
// good thing and can lead to slower command processing and potentially
// to deadlocks (see https://github.com/redis/rueidis/issues/596).
// TODO: add metric here.
}
},
OnSubscription: func(s rueidis.PubSubSubscription) {
Expand Down Expand Up @@ -590,11 +612,12 @@ func (b *RedisBroker) runPubSub(s *shardWrapper, eventHandler BrokerEventHandler
}()

select {
case err := <-wait:
case err = <-wait:
startOnce(err)
if err != nil {
b.node.Log(NewLogEntry(LogLevelError, "pub/sub error", map[string]any{"error": err.Error()}))
}
case <-done:
case <-s.shard.closeCh:
}
}
Expand Down Expand Up @@ -1458,4 +1481,4 @@ func parseDeltaPush(input string) (deltaPublicationPush, error) {
PayloadLength: payloadLength,
Payload: payload,
}, nil
}
}
169 changes: 169 additions & 0 deletions broker_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2063,3 +2063,172 @@ func TestParseDeltaPush(t *testing.T) {
})
}
}

// See https://github.com/centrifugal/centrifugo/issues/925.
// If there is a deadlock – test will hang.
func TestRedisClientSubscribeRecoveryServerSubs(t *testing.T) {
isInTest = true
doneCh := make(chan struct{})
defer close(doneCh)
node := nodeWithRedisBroker(t, true, false, 6379)
defer func() { _ = node.Shutdown(context.Background()) }()
defer stopRedisBroker(node.broker.(*RedisBroker))

channel1 := testChannelRedisClientSubscribeRecoveryDeadlock1
channel2 := testChannelRedisClientSubscribeRecoveryDeadlock2

for _, ch := range []string{channel1, channel2} {
go func(channel string) {
i := 0
for {
_, err := node.Publish(channel, []byte(`{"n": `+strconv.Itoa(i)+`}`), WithHistory(1000, time.Second))
if err != nil {
if !strings.Contains(err.Error(), "rueidis client is closing") {
require.NoError(t, err)
}
return
}
time.Sleep(10 * time.Millisecond)
i++
}
}(ch)
}

node.OnConnecting(func(ctx context.Context, event ConnectEvent) (ConnectReply, error) {
return ConnectReply{
Subscriptions: map[string]SubscribeOptions{
channel1: {EnableRecovery: true},
channel2: {EnableRecovery: true},
},
}, nil
})

node.OnConnect(func(client *Client) {
client.OnSubscribe(func(event SubscribeEvent, callback SubscribeCallback) {
callback(SubscribeReply{
Options: SubscribeOptions{EnableRecovery: true},
}, nil)
})
})

time.Sleep(10 * time.Millisecond)

var wg sync.WaitGroup

for i := 0; i < 1; i++ {
wg.Add(1)
go func() {
defer wg.Done()
client := newTestClient(t, node, "42")
rwWrapper := testReplyWriterWrapper()
_, err := client.connectCmd(&protocol.ConnectRequest{
Subs: map[string]*protocol.SubscribeRequest{},
}, &protocol.Command{}, time.Now(), rwWrapper.rw)
require.NoError(t, err)
require.Nil(t, rwWrapper.replies[0].Error)
require.True(t, client.authenticated)
_ = extractConnectReply(rwWrapper.replies)
client.triggerConnect()
client.scheduleOnConnectTimers()
}()
}

waitGroupWithTimeout(t, &wg, 5*time.Second)
}

func waitGroupWithTimeout(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
c := make(chan struct{})
go func() {
wg.Wait()
close(c)
}()
select {
case <-c:
case <-time.After(timeout):
require.Fail(t, "timeout")
}
}

// Similar to TestRedisClientSubscribeRecoveryServerSubs test, but uses client-side subscriptions.
func TestRedisClientSubscribeRecoveryClientSubs(t *testing.T) {
doneCh := make(chan struct{})
defer close(doneCh)
node := nodeWithRedisBroker(t, true, false, 6379)
defer func() { _ = node.Shutdown(context.Background()) }()
defer stopRedisBroker(node.broker.(*RedisBroker))

channel1 := "TestRedisClientSubscribeRecovery1"
channel2 := "TestRedisClientSubscribeRecovery2"

for _, channel := range []string{channel1, channel2} {
go func(channel string) {
i := 0
for {
_, err := node.Publish(channel, []byte(`{"n": `+strconv.Itoa(i)+`}`), WithHistory(1000, time.Second))
if err != nil {
if !strings.Contains(err.Error(), "rueidis client is closing") {
require.NoError(t, err)
}
return
}
i++
}
}(channel)
}

node.OnConnect(func(client *Client) {
client.OnSubscribe(func(event SubscribeEvent, callback SubscribeCallback) {
callback(SubscribeReply{
Options: SubscribeOptions{EnableRecovery: true},
}, nil)
})
})

time.Sleep(10 * time.Millisecond)

var wg sync.WaitGroup

for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
client := newTestClient(t, node, "42")
connectClientV2(t, client)
rwWrapper := testReplyWriterWrapper()
err := client.handleSubscribe(&protocol.SubscribeRequest{
Channel: channel1,
Recover: true,
Epoch: "",
}, &protocol.Command{}, time.Now(), rwWrapper.rw)
require.NoError(t, err)
require.Equal(t, 1, len(rwWrapper.replies))
require.Nil(t, rwWrapper.replies[0].Error)
res := extractSubscribeResult(rwWrapper.replies)
require.Empty(t, res.Offset)
require.NotZero(t, res.Epoch)
require.True(t, res.Recovered)

err = client.handleUnsubscribe(&protocol.UnsubscribeRequest{
Channel: channel1,
}, &protocol.Command{}, time.Now(), rwWrapper.rw)
require.NoError(t, err)
require.Equal(t, 2, len(rwWrapper.replies))
require.Nil(t, rwWrapper.replies[0].Error)

err = client.handleSubscribe(&protocol.SubscribeRequest{
Channel: channel1,
Recover: true,
Epoch: "",
}, &protocol.Command{}, time.Now(), rwWrapper.rw)
require.NoError(t, err)
require.Equal(t, 3, len(rwWrapper.replies))
require.Nil(t, rwWrapper.replies[0].Error)
res = extractSubscribeResult(rwWrapper.replies)
require.Empty(t, res.Offset)
require.NotZero(t, res.Epoch)
require.True(t, res.Recovered)
}()
}

wg.Wait()
}
17 changes: 16 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,15 @@ func (c *Client) unlockServerSideSubscriptions(subCtxMap map[string]subscribeCon
}
}

// isInTest may be true during Centrifuge test run. We use it to inject code required to
// cover various edge case scenarios.
var isInTest = false

const (
testChannelRedisClientSubscribeRecoveryDeadlock1 = "TestRedisClientSubscribeRecoveryDeadlock1"
testChannelRedisClientSubscribeRecoveryDeadlock2 = "TestRedisClientSubscribeRecoveryDeadlock2"
)

// connectCmd handles connect command from client - client must send connect
// command immediately after establishing connection with server.
func (c *Client) connectCmd(req *protocol.ConnectRequest, cmd *protocol.Command, started time.Time, rw *replyWriter) (*protocol.ConnectResult, error) {
Expand Down Expand Up @@ -2438,6 +2447,12 @@ func (c *Client) connectCmd(req *protocol.ConnectRequest, cmd *protocol.Command,
subCmd.Offset = subReq.Offset
subCmd.Epoch = subReq.Epoch
}
if isInTest && ch == testChannelRedisClientSubscribeRecoveryDeadlock2 { // Only for tests.
select {
case <-time.After(time.Second):
case <-c.Context().Done():
}
}
subCtx := c.subscribeCmd(subCmd, SubscribeReply{Options: opts}, nil, true, started, nil)
subMu.Lock()
subs[ch] = subCtx.result
Expand Down Expand Up @@ -3506,4 +3521,4 @@ func toClientErr(err error) *Error {
return clientErr
}
return ErrorInternal
}
}

0 comments on commit 518f570

Please sign in to comment.