Skip to content

Commit

Permalink
br/streamhelper: fix subscribe error (#39689)
Browse files Browse the repository at this point in the history
close #39688
  • Loading branch information
YuJuncen authored Dec 7, 2022
1 parent 6924a44 commit 98d84ab
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 5 deletions.
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (t trivialFlushStream) Recv() (*logbackup.SubscribeFlushEventResponse, erro
return &item, nil
default:
}
return nil, t.cx.Err()
return nil, status.Error(codes.Canceled, t.cx.Err().Error())
}
}

Expand Down
10 changes: 6 additions & 4 deletions br/pkg/streamhelper/flush_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,20 +211,22 @@ func (s *subscription) connect(ctx context.Context, dialer LogBackupService) {

func (s *subscription) doConnect(ctx context.Context, dialer LogBackupService) error {
log.Info("[log backup subscription manager] Adding subscription.", zap.Uint64("store", s.storeID), zap.Uint64("boot", s.storeBootAt))
s.clearError()
// We should shutdown the background task firstly.
// Once it yields some error during shuting down, the error won't be brought to next run.
s.close()
s.clearError()

c, err := dialer.GetLogBackupClient(ctx, s.storeID)
if err != nil {
return err
return errors.Annotate(err, "failed to get log backup client")
}
cx, cancel := context.WithCancel(ctx)
cli, err := c.SubscribeFlushEvent(cx, &logbackup.SubscribeFlushEventRequest{
ClientId: uuid.NewString(),
})
if err != nil {
cancel()
return err
return errors.Annotate(err, "failed to subscribe events")
}
s.cancel = cancel
s.background = spawnJoinable(func() { s.listenOver(cli) })
Expand All @@ -249,7 +251,7 @@ func (s *subscription) listenOver(cli eventStream) {
msg, err := cli.Recv()
if err != nil {
log.Info("[log backup flush subscriber] Listen stopped.", zap.Uint64("store", storeID), logutil.ShortError(err))
if err == io.EOF || err == context.Canceled {
if err == io.EOF || err == context.Canceled || status.Code(err) == codes.Canceled {
return
}
s.emitError(errors.Annotatef(err, "while receiving from store id %d", storeID))
Expand Down
21 changes: 21 additions & 0 deletions br/pkg/streamhelper/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func installSubscribeSupport(c *fakeCluster) {
Expand Down Expand Up @@ -105,6 +107,25 @@ func TestHasFailureStores(t *testing.T) {
req.NoError(sub.PendingErrors())
}

func TestStoreOffline(t *testing.T) {
req := require.New(t)
ctx := context.Background()
c := createFakeCluster(t, 4, true)
c.splitAndScatter("0001", "0002", "0003", "0008", "0009")
installSubscribeSupport(c)

c.onGetClient = func(u uint64) error {
return status.Error(codes.DataLoss, "upon an eclipsed night, some of data (not all data) have fled from the dataset")
}
sub := streamhelper.NewSubscriber(c, c)
req.NoError(sub.UpdateStoreTopology(ctx))
req.Error(sub.PendingErrors())

c.onGetClient = nil
sub.HandleErrors(ctx)
req.NoError(sub.PendingErrors())
}

func TestStoreRemoved(t *testing.T) {
req := require.New(t)
ctx := context.Background()
Expand Down

0 comments on commit 98d84ab

Please sign in to comment.