Skip to content

Commit 93c6492

Browse files
authored
log-backup: added more robust error handling for log backup advancer (#41083)
close #41082
1 parent 85c48ed commit 93c6492

File tree

4 files changed

+79
-17
lines changed

4 files changed

+79
-17
lines changed

br/pkg/streamhelper/BUILD.bazel

+2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ go_library(
3434
"@com_github_golang_protobuf//proto",
3535
"@com_github_google_uuid//:uuid",
3636
"@com_github_pingcap_errors//:errors",
37+
"@com_github_pingcap_failpoint//:failpoint",
3738
"@com_github_pingcap_kvproto//pkg/brpb",
3839
"@com_github_pingcap_kvproto//pkg/logbackuppb",
3940
"@com_github_pingcap_kvproto//pkg/metapb",
@@ -78,6 +79,7 @@ go_test(
7879
"//tablecodec",
7980
"//util/codec",
8081
"@com_github_pingcap_errors//:errors",
82+
"@com_github_pingcap_failpoint//:failpoint",
8183
"@com_github_pingcap_kvproto//pkg/brpb",
8284
"@com_github_pingcap_kvproto//pkg/errorpb",
8385
"@com_github_pingcap_kvproto//pkg/logbackuppb",

br/pkg/streamhelper/advancer.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -262,14 +262,16 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) {
262262
return
263263
case e, ok := <-ch:
264264
if !ok {
265+
log.Info("[log backup advancer] Task watcher exits due to stream ends.")
265266
return
266267
}
267-
log.Info("meet task event", zap.Stringer("event", &e))
268+
log.Info("[log backup advancer] Meet task event", zap.Stringer("event", &e))
268269
if err := c.onTaskEvent(ctx, e); err != nil {
269270
if errors.Cause(e.Err) != context.Canceled {
270271
log.Error("listen task meet error, would reopen.", logutil.ShortError(err))
271272
time.AfterFunc(c.cfg.BackoffTime, func() { c.StartTaskListener(ctx) })
272273
}
274+
log.Info("[log backup advancer] Task watcher exits due to some error.", logutil.ShortError(err))
273275
return
274276
}
275277
}

br/pkg/streamhelper/advancer_cliext.go

+32-14
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ import (
77
"context"
88
"encoding/binary"
99
"fmt"
10+
"io"
1011
"strings"
1112

1213
"github.com/golang/protobuf/proto"
1314
"github.com/pingcap/errors"
15+
"github.com/pingcap/failpoint"
1416
backuppb "github.com/pingcap/kvproto/pkg/brpb"
1517
"github.com/pingcap/log"
1618
berrors "github.com/pingcap/tidb/br/pkg/errors"
19+
"github.com/pingcap/tidb/br/pkg/logutil"
1720
"github.com/pingcap/tidb/br/pkg/redact"
1821
"github.com/pingcap/tidb/kv"
1922
clientv3 "go.etcd.io/etcd/client/v3"
@@ -94,6 +97,9 @@ func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (Ta
9497

9598
func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResponse) ([]TaskEvent, error) {
9699
result := make([]TaskEvent, 0, len(resp.Events))
100+
if err := resp.Err(); err != nil {
101+
return nil, err
102+
}
97103
for _, event := range resp.Events {
98104
te, err := t.toTaskEvent(ctx, event)
99105
if err != nil {
@@ -110,6 +116,7 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
110116
handleResponse := func(resp clientv3.WatchResponse) bool {
111117
events, err := t.eventFromWatch(ctx, resp)
112118
if err != nil {
119+
log.Warn("[log backup advancer] Meet error during receiving the task event.", logutil.ShortError(err))
113120
ch <- errorEvent(err)
114121
return false
115122
}
@@ -118,33 +125,44 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
118125
}
119126
return true
120127
}
128+
collectRemaining := func() {
129+
log.Info("[log backup advancer] Start collecting remaining events in the channel.", zap.Int("remained", len(c)))
130+
defer log.Info("[log backup advancer] Finish collecting remaining events in the channel.")
131+
for {
132+
select {
133+
case resp, ok := <-c:
134+
if !ok {
135+
return
136+
}
137+
if !handleResponse(resp) {
138+
return
139+
}
140+
default:
141+
return
142+
}
143+
}
144+
}
121145

122146
go func() {
123147
defer close(ch)
124148
for {
125149
select {
126150
case resp, ok := <-c:
151+
failpoint.Inject("advancer_close_channel", func() {
152+
// We cannot really close the channel, just simulating it.
153+
ok = false
154+
})
127155
if !ok {
156+
ch <- errorEvent(io.EOF)
128157
return
129158
}
130159
if !handleResponse(resp) {
131160
return
132161
}
133162
case <-ctx.Done():
134-
// drain the remain event from channel.
135-
for {
136-
select {
137-
case resp, ok := <-c:
138-
if !ok {
139-
return
140-
}
141-
if !handleResponse(resp) {
142-
return
143-
}
144-
default:
145-
return
146-
}
147-
}
163+
collectRemaining()
164+
ch <- errorEvent(ctx.Err())
165+
return
148166
}
149167
}
150168
}()

br/pkg/streamhelper/integration_test.go

+42-2
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import (
77
"context"
88
"encoding/binary"
99
"fmt"
10+
"io"
1011
"net"
1112
"net/url"
1213
"path"
1314
"testing"
1415

1516
"github.com/pingcap/errors"
17+
"github.com/pingcap/failpoint"
1618
backuppb "github.com/pingcap/kvproto/pkg/brpb"
1719
"github.com/pingcap/log"
1820
berrors "github.com/pingcap/tidb/br/pkg/errors"
@@ -143,6 +145,7 @@ func TestIntegration(t *testing.T) {
143145
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
144146
t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
145147
t.Run("testStoptask", func(t *testing.T) { testStoptask(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
148+
t.Run("TestStreamClose", func(t *testing.T) { testStreamClose(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
146149
}
147150

148151
func TestChecking(t *testing.T) {
@@ -295,6 +298,7 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
295298
taskInfo2 := simpleTask(taskName2, 4)
296299
require.NoError(t, metaCli.PutTask(ctx, taskInfo2))
297300
require.NoError(t, metaCli.DeleteTask(ctx, taskName2))
301+
298302
first := <-ch
299303
require.Equal(t, first.Type, streamhelper.EventAdd)
300304
require.Equal(t, first.Name, taskName)
@@ -310,8 +314,44 @@ func testStreamListening(t *testing.T, metaCli streamhelper.AdvancerExt) {
310314
require.Equal(t, forth.Type, streamhelper.EventDel)
311315
require.Equal(t, forth.Name, taskName2)
312316
cancel()
313-
_, ok := <-ch
314-
require.False(t, ok)
317+
fifth, ok := <-ch
318+
require.True(t, ok)
319+
require.Equal(t, fifth.Type, streamhelper.EventErr)
320+
require.Error(t, fifth.Err, context.Canceled)
321+
item, ok := <-ch
322+
require.False(t, ok, "%v", item)
323+
}
324+
325+
func testStreamClose(t *testing.T, metaCli streamhelper.AdvancerExt) {
326+
ctx := context.Background()
327+
taskName := "close_simple"
328+
taskInfo := simpleTask(taskName, 4)
329+
330+
require.NoError(t, metaCli.PutTask(ctx, taskInfo))
331+
ch := make(chan streamhelper.TaskEvent, 1024)
332+
require.NoError(t, metaCli.Begin(ctx, ch))
333+
require.NoError(t, metaCli.DeleteTask(ctx, taskName))
334+
first := <-ch
335+
require.Equal(t, first.Type, streamhelper.EventAdd)
336+
require.Equal(t, first.Name, taskName)
337+
require.ElementsMatch(t, first.Ranges, simpleRanges(4))
338+
second := <-ch
339+
require.Equal(t, second.Type, streamhelper.EventDel, "%s", second)
340+
require.Equal(t, second.Name, taskName, "%s", second)
341+
342+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel", "return"))
343+
defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/advancer_close_channel")
344+
// We need to make the channel file some events hence we can simulate the closed channel.
345+
taskName2 := "close_simple2"
346+
taskInfo2 := simpleTask(taskName2, 4)
347+
require.NoError(t, metaCli.PutTask(ctx, taskInfo2))
348+
require.NoError(t, metaCli.DeleteTask(ctx, taskName2))
349+
350+
third := <-ch
351+
require.Equal(t, third.Type, streamhelper.EventErr)
352+
require.Error(t, third.Err, io.EOF)
353+
item, ok := <-ch
354+
require.False(t, ok, "%#v", item)
315355
}
316356

317357
func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {

0 commit comments

Comments
 (0)