Skip to content

Commit

Permalink
lightning: fix wrongly retry writing when partial write + "needRescan" (
Browse files Browse the repository at this point in the history
#43366)

Signed-off-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
lance6716 authored Apr 24, 2023
1 parent db9c538 commit f179885
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 1 deletion.
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,9 @@ func (local *Backend) executeJob(
job.lastRetryableErr = err
return nil
}
if job.stage == needRescan {
return nil
}

if job.writeResult == nil || job.writeResult.remainingStartKey == nil {
return nil
Expand Down
107 changes: 106 additions & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,10 +1343,115 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) {
// then meet NotLeader error, scanned new region (11,12,13)
// repeat above for 11,12,13
require.Equal(t, []uint64{1, 2, 3, 11, 12, 13}, apiInvokeRecorder["Write"])
// store 12 has a follower busy, so it will break the workflow for region (11, 12, 13)
require.Equal(t, []uint64{1, 2, 3, 1, 11, 12, 13, 11}, apiInvokeRecorder["MultiIngest"])
}

func TestPartialWriteIngestErrorWillPanic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// let lightning meet any error that will call convertStageTo(needRescan)
apiInvokeRecorder := map[string][]uint64{}
notLeaderResp := &sst.IngestResponse{
Error: &errorpb.Error{
NotLeader: &errorpb.NotLeader{Leader: &metapb.Peer{StoreId: 11}},
}}

local := &Backend{
splitCli: initTestSplitClient3Replica([][]byte{{}, {'c'}}, nil),
importClientFactory: &mockImportClientFactory{
stores: []*metapb.Store{
{Id: 1}, {Id: 2}, {Id: 3},
},
createClientFn: func(store *metapb.Store) sst.ImportSSTClient {
importCli := newMockImportClient()
importCli.store = store
importCli.apiInvokeRecorder = apiInvokeRecorder
if store.Id == 1 {
importCli.retry = 1
importCli.resp = notLeaderResp
}
return importCli
},
},
logger: log.L(),
writeLimiter: noopStoreWriteLimiter{},
bufferPool: membuf.NewPool(),
supportMultiIngest: true,
tikvCodec: keyspace.CodecV1,
}

db, tmpPath := makePebbleDB(t, nil)
_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel2 := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
cancel: cancel2,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: noopKeyAdapter{},
logger: log.L(),
}
err := f.db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
err = f.db.Set([]byte("a2"), []byte("a2"), nil)
require.NoError(t, err)

jobCh := make(chan *regionJob, 10)

partialWriteJob := &regionJob{
keyRange: Range{start: []byte("a"), end: []byte("c")},
region: &split.RegionInfo{
Region: &metapb.Region{
Id: 1,
Peers: []*metapb.Peer{
{Id: 1, StoreId: 1}, {Id: 2, StoreId: 2}, {Id: 3, StoreId: 3},
},
StartKey: []byte("a"),
EndKey: []byte("c"),
},
Leader: &metapb.Peer{Id: 1, StoreId: 1},
},
stage: regionScanned,
engine: f,
// use small regionSplitSize to trigger partial write
regionSplitSize: 1,
}
var jobWg sync.WaitGroup
jobWg.Add(1)
jobCh <- partialWriteJob

var wg sync.WaitGroup
wg.Add(1)
jobOutCh := make(chan *regionJob)
go func() {
defer wg.Done()
for {
job := <-jobOutCh
if job.stage == regionScanned {
jobWg.Done()
return
}
require.Fail(t, "job stage %s is not expected", job.stage)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err := local.startWorker(ctx, jobCh, jobOutCh, &jobWg)
require.NoError(t, err)
}()

jobWg.Wait()
cancel()
wg.Wait()

require.Equal(t, []uint64{1, 2, 3}, apiInvokeRecorder["Write"])
require.Equal(t, []uint64{1}, apiInvokeRecorder["MultiIngest"])
}

// mockGetSizeProperties mocks that 50MB * 20 SST file.
func mockGetSizeProperties(log.Logger, *pebble.DB, KeyAdapter) (*sizeProperties, error) {
props := newSizeProperties()
Expand Down

0 comments on commit f179885

Please sign in to comment.