From 2ab8d4ebb3ae309c4f71b6fb202aa0dd53eb9ac8 Mon Sep 17 00:00:00 2001 From: kennytm Date: Thu, 29 Nov 2018 11:42:27 +0800 Subject: [PATCH] Make the KV-Encode and Deliver-to-Importer concurrent; Turn the old metrics into real Prometheus metrics (#88) * *: replace the ad-hoc common.metrics by prometheus For now we do not distinguish between tables. The 4 old metrics are replaced by 6 new histograms. * restore: move the delivery into its own goroutine to parallelize it This should increase the performance by 30%. * tests: retain the logs after running * restore: apply backpressure on KV-Encoding to prevent OOM * restore: print the duration needed for import + cleanup * metric, restore: record checksum durations * restore: limits the size of each Put() message to 31 MB Prevents the gRPC ResourceExhausted error on caused by importer's MaxRecvSize. * restore: don't print time taken when import failed * restore: fix missing error check --- lightning/common/metrics.go | 63 -------- lightning/metric/metric.go | 64 ++++++++ lightning/restore/restore.go | 229 ++++++++++++++++++++--------- lightning/restore/split_kv_test.go | 43 ++++++ tests/run.sh | 3 +- 5 files changed, 272 insertions(+), 130 deletions(-) delete mode 100644 lightning/common/metrics.go create mode 100644 lightning/restore/split_kv_test.go diff --git a/lightning/common/metrics.go b/lightning/common/metrics.go deleted file mode 100644 index dcf8f0232f5e4..0000000000000 --- a/lightning/common/metrics.go +++ /dev/null @@ -1,63 +0,0 @@ -package common - -import ( - "fmt" - "sort" - "strings" - "sync" - "time" -) - -type Metrics struct { - lock sync.Mutex - Timing map[string]*TimeCost -} - -func NewMetrics() *Metrics { - return &Metrics{ - Timing: make(map[string]*TimeCost), - } -} - -func (m *Metrics) MarkTiming(name string, since time.Time) { - m.costTimeNS(name, time.Since(since).Nanoseconds()) -} - -func (m *Metrics) costTimeNS(name string, ns int64) { - m.lock.Lock() - defer m.lock.Unlock() - - t, ok := m.Timing[name] - if !ok { - t = &TimeCost{total: 0, times: 0} - m.Timing[name] = t - } - t.total += ns - t.times++ -} - -func (m *Metrics) DumpTiming() string { - marks := make([]string, 0, len(m.Timing)) - for mark := range m.Timing { - marks = append(marks, mark) - } - sort.Strings(marks) - - lines := make([]string, 0, len(marks)) - for _, mark := range marks { - t := m.Timing[mark] - l := fmt.Sprintf("%-40s : total = %.3f s / times = %d / avg = %.3f s", mark, t.Total(), t.Times(), t.Avg()) - lines = append(lines, l) - } - - return strings.Join(lines, "\n") -} - -type TimeCost struct { - total int64 - times int -} - -func (t *TimeCost) Total() float64 { return float64(t.total) / 1000000000 } -func (t *TimeCost) Times() int { return t.times } -func (t *TimeCost) Avg() float64 { return t.Total() / float64(t.times) } diff --git a/lightning/metric/metric.go b/lightning/metric/metric.go index 0e8251509a86e..4bbe437ab802b 100644 --- a/lightning/metric/metric.go +++ b/lightning/metric/metric.go @@ -74,6 +74,63 @@ var ( // - running // - finished // - failed + + ImportSecondsHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "lightning", + Name: "import_seconds", + Help: "time needed to import a table", + Buckets: prometheus.ExponentialBuckets(0.125, 2, 6), + }, + ) + BlockReadSecondsHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "lightning", + Name: "block_read_seconds", + Help: "time needed to read a block", + Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 7), + }, + ) + BlockReadBytesHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "lightning", + Name: "block_read_bytes", + Help: "number of bytes being read out from data source", + Buckets: prometheus.ExponentialBuckets(1024, 2, 8), + }, + ) + BlockEncodeSecondsHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "lightning", + Name: "block_encode_seconds", + Help: "time needed to encode a block", + Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10), + }, + ) + BlockDeliverSecondsHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "lightning", + Name: "block_deliver_seconds", + Help: "time needed to deliver a block", + Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10), + }, + ) + BlockDeliverBytesHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "lightning", + Name: "block_deliver_bytes", + Help: "number of bytes being sent out to importer", + Buckets: prometheus.ExponentialBuckets(512, 2, 10), + }, + ) + ChecksumSecondsHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "lightning", + Name: "checksum_seconds", + Help: "time needed to complete the checksum stage", + Buckets: prometheus.ExponentialBuckets(1, 2.2679331552660544, 10), + }, + ) ) func init() { @@ -82,6 +139,13 @@ func init() { prometheus.MustRegister(KvEncoderCounter) prometheus.MustRegister(TableCounter) prometheus.MustRegister(ChunkCounter) + prometheus.MustRegister(ImportSecondsHistogram) + prometheus.MustRegister(BlockReadSecondsHistogram) + prometheus.MustRegister(BlockReadBytesHistogram) + prometheus.MustRegister(BlockEncodeSecondsHistogram) + prometheus.MustRegister(BlockDeliverSecondsHistogram) + prometheus.MustRegister(BlockDeliverBytesHistogram) + prometheus.MustRegister(ChecksumSecondsHistogram) } func RecordTableCount(status string, err error) { diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index f32886469fa8e..06173743a06a5 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -35,8 +35,6 @@ const ( Level1Compact = 1 ) -var metrics = common.NewMetrics() - const ( defaultGCLifeTime = 100 * time.Hour ) @@ -187,8 +185,6 @@ outside: } } - statistic := metrics.DumpTiming() - common.AppLogger.Infof("Timing statistic :\n%s", statistic) common.AppLogger.Infof("the whole procedure takes %v", time.Since(timer)) rc.errorSummaries.emitLog() @@ -972,10 +968,6 @@ func (tr *TableRestore) importKV(ctx context.Context, closedEngine *kv.ClosedEng common.AppLogger.Infof("[%s] flush kv deliver ...", tr.tableName) start := time.Now() - defer func() { - metrics.MarkTiming(fmt.Sprintf("[%s]_kv_flush", tr.tableName), start) - common.AppLogger.Infof("[%s] kv deliver all flushed !", tr.tableName) - }() err := closedEngine.Import(ctx) if err != nil { @@ -985,6 +977,11 @@ func (tr *TableRestore) importKV(ctx context.Context, closedEngine *kv.ClosedEng return errors.Trace(err) } closedEngine.Cleanup(ctx) + + dur := time.Since(start) + metric.ImportSecondsHistogram.Observe(dur.Seconds()) + common.AppLogger.Infof("[%s] kv deliver all flushed, takes %v", tr.tableName, dur) + return nil } @@ -999,9 +996,11 @@ func (tr *TableRestore) compareChecksum(ctx context.Context, cfg *config.Config, for _, chunk := range cp.Chunks { localChecksum.Add(&chunk.Checksum) } - common.AppLogger.Infof("[%s] local checksum %+v", tr.tableName, localChecksum) + start := time.Now() remoteChecksum, err := DoChecksum(ctx, cfg.TiDB, tr.tableName) + dur := time.Since(start) + metric.ChecksumSecondsHistogram.Observe(dur.Seconds()) if err != nil { return errors.Trace(err) } @@ -1016,7 +1015,7 @@ func (tr *TableRestore) compareChecksum(ctx context.Context, cfg *config.Config, ) } - common.AppLogger.Infof("[%s] checksum pass", tr.tableName) + common.AppLogger.Infof("[%s] checksum pass, %+v takes %v", tr.tableName, localChecksum, dur) return nil } @@ -1113,6 +1112,29 @@ func increaseGCLifeTime(ctx context.Context, db *sql.DB) (oriGCLifeTime string, //////////////////////////////////////////////////////////////// +const ( + maxKVQueueSize = 128 + maxDeliverBytes = 31 << 20 // 31 MB. hardcoded by importer, so do we +) + +func splitIntoDeliveryStreams(totalKVs []kvenc.KvPair, splitSize int) [][]kvenc.KvPair { + res := make([][]kvenc.KvPair, 0, 1) + i := 0 + cumSize := 0 + + for j, pair := range totalKVs { + size := len(pair.Key) + len(pair.Val) + if i < j && cumSize + size > splitSize { + res = append(res, totalKVs[i:j]) + i = j + cumSize = 0 + } + cumSize += size + } + + return append(res, totalKVs[i:]) +} + func (cr *chunkRestore) restore( ctx context.Context, t *TableRestore, @@ -1139,11 +1161,99 @@ func (cr *chunkRestore) restore( } }() - readMark := fmt.Sprintf("[%s]_read_file", t.tableName) - encodeMark := fmt.Sprintf("[%s]_sql_2_kv", t.tableName) - deliverMark := fmt.Sprintf("[%s]_deliver_write", t.tableName) - timer := time.Now() + readTotalDur := time.Duration(0) + encodeTotalDur := time.Duration(0) + deliverTotalDur := time.Duration(0) + + var block struct { + cond *sync.Cond + encodeCompleted bool + totalKVs []kvenc.KvPair + localChecksum verify.KVChecksum + chunkOffset int64 + chunkRowID int64 + } + block.cond = sync.NewCond(new(sync.Mutex)) + deliverCompleteCh := make(chan error) + + go func() { + for { + block.cond.L.Lock() + for !block.encodeCompleted && len(block.totalKVs) == 0 { + block.cond.Wait() + } + b := block + block.totalKVs = nil + block.localChecksum = verify.MakeKVChecksum(0, 0, 0) + block.cond.L.Unlock() + + if b.encodeCompleted && len(b.totalKVs) == 0 { + deliverCompleteCh <- nil + return + } + + // kv -> deliver ( -> tikv ) + start := time.Now() + stream, err := engine.NewWriteStream(ctx) + if err != nil { + deliverCompleteCh <- errors.Trace(err) + return + } + + for _, kvs := range splitIntoDeliveryStreams(b.totalKVs, maxDeliverBytes) { + if e := stream.Put(kvs); e != nil { + if err != nil { + common.AppLogger.Warnf("failed to put write stream: %s", e.Error()) + } else { + err = e + } + } + } + b.totalKVs = nil + + block.cond.Signal() + if e := stream.Close(); e != nil { + if err != nil { + common.AppLogger.Warnf("failed to close write stream: %s", e.Error()) + } else { + err = e + } + } + deliverDur := time.Since(start) + deliverTotalDur += deliverDur + metric.BlockDeliverSecondsHistogram.Observe(deliverDur.Seconds()) + metric.BlockDeliverBytesHistogram.Observe(float64(b.localChecksum.SumSize())) + + if err != nil { + // TODO : retry ~ + common.AppLogger.Errorf("kv deliver failed = %s\n", err.Error()) + deliverCompleteCh <- errors.Trace(err) + return + } + + // Update the table, and save a checkpoint. + // (the write to the importer is effective immediately, thus update these here) + cr.chunk.Checksum.Add(&b.localChecksum) + cr.chunk.Chunk.Offset = b.chunkOffset + cr.chunk.Chunk.PrevRowIDMax = b.chunkRowID + rc.saveCpCh <- saveCp{ + tableName: t.tableName, + merger: &RebaseCheckpointMerger{ + AllocBase: t.alloc.Base() + 1, + }, + } + rc.saveCpCh <- saveCp{ + tableName: t.tableName, + merger: &ChunkCheckpointMerger{ + Key: cr.chunk.Key, + Checksum: cr.chunk.Checksum, + Pos: cr.chunk.Chunk.Offset, + RowID: cr.chunk.Chunk.PrevRowIDMax, + }, + } + } + }() for { select { @@ -1190,69 +1300,56 @@ func (cr *chunkRestore) restore( } sqls.WriteByte(';') - metrics.MarkTiming(readMark, start) + readDur := time.Since(start) + readTotalDur += readDur + metric.BlockReadSecondsHistogram.Observe(readDur.Seconds()) + metric.BlockReadBytesHistogram.Observe(float64(sqls.Len())) - var ( - totalKVs []kvenc.KvPair - localChecksum verify.KVChecksum - ) // sql -> kv start = time.Now() kvs, _, err := kvEncoder.SQL2KV(sqls.String()) - metrics.MarkTiming(encodeMark, start) + encodeDur := time.Since(start) + encodeTotalDur += encodeDur + metric.BlockEncodeSecondsHistogram.Observe(encodeDur.Seconds()) + common.AppLogger.Debugf("len(kvs) %d, len(sql) %d", len(kvs), sqls.Len()) if err != nil { common.AppLogger.Errorf("kv encode failed = %s\n", err.Error()) return errors.Trace(err) } - totalKVs = append(totalKVs, kvs...) - localChecksum.Update(kvs) - - // kv -> deliver ( -> tikv ) - start = time.Now() - stream, err := engine.NewWriteStream(ctx) - if err != nil { - return errors.Trace(err) - } - err = stream.Put(totalKVs) - if e := stream.Close(); e != nil { - if err != nil { - common.AppLogger.Warnf("failed to close write stream: %s", e.Error()) - } else { - err = e - } - } - metrics.MarkTiming(deliverMark, start) - if err != nil { - // TODO : retry ~ - common.AppLogger.Errorf("kv deliver failed = %s\n", err.Error()) - return errors.Trace(err) - } - - // Update the table, and save a checkpoint. - // (the write to the importer is effective immediately, thus update these here) - cr.chunk.Checksum.Add(&localChecksum) - cr.chunk.Chunk.Offset = cr.parser.Pos() - cr.chunk.Chunk.PrevRowIDMax = cr.parser.LastRow().RowID - rc.saveCpCh <- saveCp{ - tableName: t.tableName, - merger: &RebaseCheckpointMerger{ - AllocBase: t.alloc.Base() + 1, - }, + block.cond.L.Lock() + for len(block.totalKVs) > len(kvs) * maxKVQueueSize { + // ^ hack to create a back-pressure preventing sending too many KV pairs at once + // this happens when delivery is slower than encoding. + // note that the KV pairs will retain the memory buffer backing the KV encoder + // and thus blow up the memory usage and will easily cause lightning to go OOM. + block.cond.Wait() } - rc.saveCpCh <- saveCp{ - tableName: t.tableName, - merger: &ChunkCheckpointMerger{ - Key: cr.chunk.Key, - Checksum: cr.chunk.Checksum, - Pos: cr.chunk.Chunk.Offset, - RowID: cr.chunk.Chunk.PrevRowIDMax, - }, + block.totalKVs = append(block.totalKVs, kvs...) + block.localChecksum.Update(kvs) + block.chunkOffset = cr.parser.Pos() + block.chunkRowID = cr.parser.LastRow().RowID + block.cond.Signal() + block.cond.L.Unlock() + } + + block.cond.L.Lock() + block.encodeCompleted = true + block.cond.Signal() + block.cond.L.Unlock() + + select { + case err := <-deliverCompleteCh: + if err == nil { + common.AppLogger.Infof( + "[%s] restore chunk #%d (%s) takes %v (read: %v, encode: %v, deliver: %v)", + t.tableName, cr.index, &cr.chunk.Key, time.Since(timer), + readTotalDur, encodeTotalDur, deliverTotalDur, + ) } + return errors.Trace(err) + case <-ctx.Done(): + return ctx.Err() } - - common.AppLogger.Infof("[%s] restore chunk #%d (%s) takes %v", t.tableName, cr.index, &cr.chunk.Key, time.Since(timer)) - - return nil } diff --git a/lightning/restore/split_kv_test.go b/lightning/restore/split_kv_test.go new file mode 100644 index 0000000000000..7ac89efa42718 --- /dev/null +++ b/lightning/restore/split_kv_test.go @@ -0,0 +1,43 @@ +package restore + +import ( + . "github.com/pingcap/check" + "github.com/pingcap/tidb/util/kvencoder" +) + +var _ = Suite(&splitKVSuite{}) + +type splitKVSuite struct{} + +func (s *splitKVSuite) TestSplitKV(c *C) { + pairs := []kvenc.KvPair{ + kvenc.KvPair{ + Key: []byte{1, 2, 3}, + Val: []byte{4, 5, 6}, + }, + kvenc.KvPair{ + Key: []byte{7, 8}, + Val: []byte{9, 0}, + }, + kvenc.KvPair{ + Key: []byte{1, 2, 3, 4}, + Val: []byte{5, 6, 7, 8}, + }, + kvenc.KvPair{ + Key: []byte{9, 0}, + Val: []byte{1, 2}, + }, + } + + splitBy10 := splitIntoDeliveryStreams(pairs, 10) + c.Assert(splitBy10, DeepEquals, [][]kvenc.KvPair{pairs[0:2], pairs[2:3], pairs[3:4]}) + + splitBy12 := splitIntoDeliveryStreams(pairs, 12) + c.Assert(splitBy12, DeepEquals, [][]kvenc.KvPair{pairs[0:2], pairs[2:4]}) + + splitBy1000 := splitIntoDeliveryStreams(pairs, 1000) + c.Assert(splitBy1000, DeepEquals, [][]kvenc.KvPair{pairs[0:4]}) + + splitBy1 := splitIntoDeliveryStreams(pairs, 1) + c.Assert(splitBy1, DeepEquals, [][]kvenc.KvPair{pairs[0:1], pairs[1:2], pairs[2:3], pairs[3:4]}) +} diff --git a/tests/run.sh b/tests/run.sh index ced3ec3614903..5a0abb44cfb6b 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -10,13 +10,14 @@ stop_services() { killall -9 tidb-server || true killall -9 tikv-importer || true - find "$TEST_DIR" -d -mindepth 1 -not -name 'cov.*' -delete || true + find "$TEST_DIR" -d -mindepth 1 -not -name 'cov.*' -not \( -depth 1 -name '*.log' \) -delete || true } start_services() { stop_services mkdir -p "$TEST_DIR" + rm -f "$TEST_DIR"/*.log echo "Starting PD..." bin/pd-server \