Skip to content

Commit

Permalink
send a batch of kv in encodeLoop (pingcap#279)
Browse files Browse the repository at this point in the history
* send a batch of kv in encodeLoop

* regine test

* address comment

* Fix wrongly pass nil columnNames.

Should get column names after `parser.ReadRow()` that it's setted
while parser the row.

* Update lightning/restore/restore.go

Co-Authored-By: kennytm <kennytm@gmail.com>

Co-authored-by: kennytm <kennytm@gmail.com>
Co-authored-by: Jiahao Huang <july2993@gmail.com>
  • Loading branch information
3 people authored Mar 16, 2020
1 parent 3c8f4d7 commit bf3c830
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 74 deletions.
2 changes: 2 additions & 0 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type TikvImporter struct {
Addr string `toml:"addr" json:"addr"`
Backend string `toml:"backend" json:"backend"`
OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"`
MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"`
}

type Checkpoint struct {
Expand Down Expand Up @@ -253,6 +254,7 @@ func NewConfig() *Config {
TikvImporter: TikvImporter{
Backend: BackendImporter,
OnDuplicate: ReplaceOnDup,
MaxKVPairs: 32,
},
PostRestore: PostRestore{
Checksum: true,
Expand Down
117 changes: 64 additions & 53 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,7 @@ type deliverResult struct {

func (cr *chunkRestore) deliverLoop(
ctx context.Context,
kvsCh <-chan deliveredKVs,
kvsCh <-chan []deliveredKVs,
t *TableRestore,
engineID int32,
dataEngine, indexEngine *kv.OpenedEngine,
Expand All @@ -1591,21 +1591,22 @@ func (cr *chunkRestore) deliverLoop(
var dataChecksum, indexChecksum verify.KVChecksum
var offset, rowID int64
var columns []string

var kvPacket []deliveredKVs
// Fetch enough KV pairs from the source.
populate:
for dataChecksum.SumSize()+indexChecksum.SumSize() < minDeliverBytes {
select {
case d := <-kvsCh:
if d.kvs == nil {
case kvPacket = <-kvsCh:
if len(kvPacket) == 0 {
channelClosed = true
break populate
}

d.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum)
columns = d.columns
offset = d.offset
rowID = d.rowID
for _, p := range kvPacket {
p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum)
columns = p.columns
offset = p.offset
rowID = p.rowID
}
case <-ctx.Done():
err = ctx.Err()
return
Expand Down Expand Up @@ -1675,14 +1676,14 @@ func (cr *chunkRestore) saveCheckpoint(t *TableRestore, engineID int32, rc *Rest

func (cr *chunkRestore) encodeLoop(
ctx context.Context,
kvsCh chan<- deliveredKVs,
kvsCh chan<- []deliveredKVs,
t *TableRestore,
logger log.Logger,
kvEncoder kv.Encoder,
deliverCompleteCh <-chan deliverResult,
pauser *common.Pauser,
rc *RestoreController,
) (readTotalDur time.Duration, encodeTotalDur time.Duration, err error) {
send := func(kvs deliveredKVs) error {
send := func(kvs []deliveredKVs) error {
select {
case kvsCh <- kvs:
return nil
Expand All @@ -1700,63 +1701,73 @@ func (cr *chunkRestore) encodeLoop(
}
}

initializedColumns := false
outside:
for {
pauser, maxKvPairsCnt := rc.pauser, rc.cfg.TikvImporter.MaxKVPairs
initializedColumns, reachEOF := false, false
for !reachEOF {
if err = pauser.Wait(ctx); err != nil {
return
}

offset, _ := cr.parser.Pos()
if offset >= cr.chunk.Chunk.EndOffset {
break
}

start := time.Now()
err = cr.parser.ReadRow()
newOffset, rowID := cr.parser.Pos()
columnNames := cr.parser.Columns()
switch errors.Cause(err) {
case nil:
if !initializedColumns {
if len(cr.chunk.ColumnPermutation) == 0 {
t.initializeColumns(columnNames, cr.chunk)
var readDur, encodeDur time.Duration
canDeliver := false
kvPacket := make([]deliveredKVs, 0, maxKvPairsCnt)
var newOffset, rowID int64
outLoop:
for !canDeliver {
readDurStart := time.Now()
err = cr.parser.ReadRow()
columnNames := cr.parser.Columns()
newOffset, rowID = cr.parser.Pos()
switch errors.Cause(err) {
case nil:
if !initializedColumns {
if len(cr.chunk.ColumnPermutation) == 0 {
t.initializeColumns(columnNames, cr.chunk)
}
initializedColumns = true
}
initializedColumns = true
case io.EOF:
reachEOF = true
break outLoop
default:
err = errors.Annotatef(err, "in file %s at offset %d", &cr.chunk.Key, newOffset)
return
}
readDur += time.Since(readDurStart)
encodeDurStart := time.Now()
lastRow := cr.parser.LastRow()
// sql -> kv
kvs, encodeErr := kvEncoder.Encode(logger, lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation)
encodeDur += time.Since(encodeDurStart)
if encodeErr != nil {
err = errors.Annotatef(encodeErr, "in file %s at offset %d", &cr.chunk.Key, newOffset)
return
}
kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID})
if len(kvPacket) >= maxKvPairsCnt || newOffset == cr.chunk.Chunk.EndOffset {
canDeliver = true
}
case io.EOF:
break outside
default:
err = errors.Annotatef(err, "in file %s at offset %d", &cr.chunk.Key, newOffset)
return
}

readDur := time.Since(start)
encodeTotalDur += encodeDur
metric.RowEncodeSecondsHistogram.Observe(encodeDur.Seconds())
readTotalDur += readDur
metric.RowReadSecondsHistogram.Observe(readDur.Seconds())
metric.RowReadBytesHistogram.Observe(float64(newOffset - offset))

// sql -> kv
lastRow := cr.parser.LastRow()
kvs, encodeErr := kvEncoder.Encode(logger, lastRow.Row, lastRow.RowID, cr.chunk.ColumnPermutation)
encodeDur := time.Since(start)
encodeTotalDur += encodeDur
metric.RowEncodeSecondsHistogram.Observe(encodeDur.Seconds())

if encodeErr != nil {
// error is already logged inside kvEncoder.Encode(), just propagate up directly.
err = errors.Annotatef(encodeErr, "in file %s at offset %d", &cr.chunk.Key, newOffset)
return
}

deliverKvStart := time.Now()
if err = send(deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID}); err != nil {
return
if len(kvPacket) != 0 {
deliverKvStart := time.Now()
if err = send(kvPacket); err != nil {
return
}
metric.RowKVDeliverSecondsHistogram.Observe(time.Since(deliverKvStart).Seconds())
}
metric.RowKVDeliverSecondsHistogram.Observe(time.Since(deliverKvStart).Seconds())
}

err = send(deliveredKVs{kvs: nil})
err = send([]deliveredKVs{})
return
}

Expand All @@ -1773,7 +1784,7 @@ func (cr *chunkRestore) restore(
Timestamp: cr.chunk.Timestamp,
RowFormatVersion: rc.rowFormatVer,
})
kvsCh := make(chan deliveredKVs, maxKVQueueSize)
kvsCh := make(chan []deliveredKVs, maxKVQueueSize)
deliverCompleteCh := make(chan deliverResult)

defer func() {
Expand All @@ -1797,7 +1808,7 @@ func (cr *chunkRestore) restore(
zap.Stringer("path", &cr.chunk.Key),
).Begin(zap.InfoLevel, "restore file")

readTotalDur, encodeTotalDur, err := cr.encodeLoop(ctx, kvsCh, t, logTask.Logger, kvEncoder, deliverCompleteCh, rc.pauser)
readTotalDur, encodeTotalDur, err := cr.encodeLoop(ctx, kvsCh, t, logTask.Logger, kvEncoder, deliverCompleteCh, rc)
if err != nil {
return err
}
Expand Down
51 changes: 30 additions & 21 deletions lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (s *chunkRestoreSuite) TestDeliverLoopCancel(c *C) {
rc := &RestoreController{backend: kv.NewMockImporter(nil, "")}

ctx, cancel := context.WithCancel(context.Background())
kvsCh := make(chan deliveredKVs)
kvsCh := make(chan []deliveredKVs)
go cancel()
_, err := s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, nil, nil, rc)
c.Assert(errors.Cause(err), Equals, context.Canceled)
Expand Down Expand Up @@ -624,15 +624,15 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData(c *C) {

rc := &RestoreController{backend: importer}

kvsCh := make(chan deliveredKVs, 1)
kvsCh <- deliveredKVs{}
kvsCh := make(chan []deliveredKVs, 1)
kvsCh <- []deliveredKVs{}
_, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataEngine, indexEngine, rc)
c.Assert(err, IsNil)
}

func (s *chunkRestoreSuite) TestDeliverLoop(c *C) {
ctx := context.Background()
kvsCh := make(chan deliveredKVs)
kvsCh := make(chan []deliveredKVs)
mockCols := []string{"c1", "c2"}

// Open two mock engines.
Expand Down Expand Up @@ -683,7 +683,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop(c *C) {

saveCpCh := make(chan saveCp, 2)
go func() {
kvsCh <- deliveredKVs{
kvsCh <- []deliveredKVs{deliveredKVs{
kvs: kv.MakeRowFromKvPairs([]common.KvPair{
{
Key: []byte("txxxxxxxx_ryyyyyyyy"),
Expand All @@ -701,8 +701,9 @@ func (s *chunkRestoreSuite) TestDeliverLoop(c *C) {
columns: mockCols,
offset: 12,
rowID: 76,
},
}
kvsCh <- deliveredKVs{}
kvsCh <- []deliveredKVs{}
close(kvsCh)
}()

Expand All @@ -718,30 +719,32 @@ func (s *chunkRestoreSuite) TestDeliverLoop(c *C) {

func (s *chunkRestoreSuite) TestEncodeLoop(c *C) {
ctx := context.Background()
kvsCh := make(chan deliveredKVs, 2)
kvsCh := make(chan []deliveredKVs, 2)
deliverCompleteCh := make(chan deliverResult)
kvEncoder := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{
SQLMode: s.cfg.TiDB.SQLMode,
Timestamp: 1234567895,
RowFormatVersion: "1",
})

_, _, err := s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, DeliverPauser)
cfg := config.NewConfig()
rc := &RestoreController{pauser: DeliverPauser, cfg: cfg}
_, _, err := s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc)
c.Assert(err, IsNil)
c.Assert(kvsCh, HasLen, 2)

firstKVs := <-kvsCh
c.Assert(firstKVs.kvs, HasLen, 2)
c.Assert(firstKVs.rowID, Equals, int64(19))
c.Assert(firstKVs.offset, Equals, int64(36))
kvs := <-kvsCh
c.Assert(kvs, HasLen, 1)
c.Assert(kvs[0].kvs, HasLen, 2)
c.Assert(kvs[0].rowID, Equals, int64(19))
c.Assert(kvs[0].offset, Equals, int64(36))

secondKVs := <-kvsCh
c.Assert(secondKVs.kvs, IsNil)
kvs = <-kvsCh
c.Assert(len(kvs), Equals, 0)
}

func (s *chunkRestoreSuite) TestEncodeLoopCanceled(c *C) {
ctx, cancel := context.WithCancel(context.Background())
kvsCh := make(chan deliveredKVs)
kvsCh := make(chan []deliveredKVs)
deliverCompleteCh := make(chan deliverResult)
kvEncoder := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{
SQLMode: s.cfg.TiDB.SQLMode,
Expand All @@ -750,14 +753,16 @@ func (s *chunkRestoreSuite) TestEncodeLoopCanceled(c *C) {
})

go cancel()
_, _, err := s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, DeliverPauser)
cfg := config.NewConfig()
rc := &RestoreController{pauser: DeliverPauser, cfg: cfg}
_, _, err := s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc)
c.Assert(errors.Cause(err), Equals, context.Canceled)
c.Assert(kvsCh, HasLen, 0)
}

func (s *chunkRestoreSuite) TestEncodeLoopForcedError(c *C) {
ctx := context.Background()
kvsCh := make(chan deliveredKVs, 2)
kvsCh := make(chan []deliveredKVs, 2)
deliverCompleteCh := make(chan deliverResult)
kvEncoder := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{
SQLMode: s.cfg.TiDB.SQLMode,
Expand All @@ -768,14 +773,16 @@ func (s *chunkRestoreSuite) TestEncodeLoopForcedError(c *C) {
// close the chunk so reading it will result in the "file already closed" error.
s.cr.parser.Close()

_, _, err := s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, DeliverPauser)
cfg := config.NewConfig()
rc := &RestoreController{pauser: DeliverPauser, cfg: cfg}
_, _, err := s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc)
c.Assert(err, ErrorMatches, `in file .*[/\\]db\.table\.2\.sql:0 at offset 0:.*file already closed`)
c.Assert(kvsCh, HasLen, 0)
}

func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) {
ctx := context.Background()
kvsCh := make(chan deliveredKVs)
kvsCh := make(chan []deliveredKVs)
deliverCompleteCh := make(chan deliverResult)
kvEncoder := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{
SQLMode: s.cfg.TiDB.SQLMode,
Expand All @@ -788,7 +795,9 @@ func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) {
err: errors.New("fake deliver error"),
}
}()
_, _, err := s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, DeliverPauser)
cfg := config.NewConfig()
rc := &RestoreController{pauser: DeliverPauser, cfg: cfg}
_, _, err := s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc)
c.Assert(err, ErrorMatches, "fake deliver error")
c.Assert(kvsCh, HasLen, 0)
}
Expand Down

0 comments on commit bf3c830

Please sign in to comment.