Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: optimize SQL processing speed #110

Merged
merged 6 commits into from
Jan 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Lightning struct {
common.LogConfig
TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"`
RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"`
IOConcurrency int `toml:"io-concurrency" json:"io-concurrency"`
ProfilePort int `toml:"pprof-port" json:"pprof-port"`
CheckRequirements bool `toml:"check-requirements" json:"check-requirements"`
}
Expand Down Expand Up @@ -129,6 +130,7 @@ func NewConfig() *Config {
App: Lightning{
RegionConcurrency: runtime.NumCPU(),
TableConcurrency: 8,
IOConcurrency: 5,
CheckRequirements: true,
},
TiDB: DBStore{
Expand Down
2 changes: 2 additions & 0 deletions lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const (
ReadBlockSize int64 = 64 * _K
MinRegionSize int64 = 256 * _M

BufferSizeScale = 5

// kv import
KVMaxBatchSize int64 = 200 * _G
)
27 changes: 27 additions & 0 deletions lightning/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,30 @@ var (
Buckets: prometheus.ExponentialBuckets(1024, 2, 8),
},
)
ChunkParserReadBlockSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "lightning",
Name: "chunk_parser_read_block_seconds",
Help: "time needed for chunk parser read a block",
Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10),
},
)
ChunkParserReadRowSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "lightning",
Name: "chunk_parser_read_row_seconds",
Help: "time needed for chunk parser read a row",
Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10),
},
)
ApplyWorkerSecondsHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "lightning",
Name: "apply_worker_seconds",
Help: "time needed to apply a worker",
Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10),
}, []string{"name"},
)
BlockEncodeSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "lightning",
Expand Down Expand Up @@ -149,6 +173,9 @@ func init() {
prometheus.MustRegister(BlockDeliverSecondsHistogram)
prometheus.MustRegister(BlockDeliverBytesHistogram)
prometheus.MustRegister(ChecksumSecondsHistogram)
prometheus.MustRegister(ChunkParserReadRowSecondsHistogram)
prometheus.MustRegister(ChunkParserReadBlockSecondsHistogram)
prometheus.MustRegister(ApplyWorkerSecondsHistogram)
}

func RecordTableCount(status string, err error) {
Expand Down
20 changes: 17 additions & 3 deletions lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ package mydump
import (
"bytes"
"io"
"time"

"github.com/pkg/errors"

"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/metric"
"github.com/pingcap/tidb-lightning/lightning/worker"
)

// ChunkParser is a parser of the data files (the file containing only INSERT
Expand All @@ -29,6 +34,7 @@ type ChunkParser struct {
// cache
remainBuf *bytes.Buffer
appendBuf *bytes.Buffer
ioWorkers *worker.RestoreWorkerPool
}

// Chunk represents a portion of the data file.
Expand All @@ -46,12 +52,13 @@ type Row struct {
}

// NewChunkParser creates a new parser which can read chunks out of a file.
func NewChunkParser(reader io.Reader) *ChunkParser {
func NewChunkParser(reader io.Reader, blockBufSize int64, ioWorkers *worker.RestoreWorkerPool) *ChunkParser {
return &ChunkParser{
reader: reader,
blockBuf: make([]byte, 8192),
blockBuf: make([]byte, blockBufSize*config.BufferSizeScale),
remainBuf: &bytes.Buffer{},
appendBuf: &bytes.Buffer{},
ioWorkers: ioWorkers,
}
}

Expand Down Expand Up @@ -81,7 +88,13 @@ const (
)

func (parser *ChunkParser) readBlock() error {
n, err := io.ReadFull(parser.reader, parser.blockBuf)
startTime := time.Now()

// limit IO concurrency
w := parser.ioWorkers.Apply()
n, err := parser.reader.Read(parser.blockBuf)
parser.ioWorkers.Recycle(w)

switch err {
case io.ErrUnexpectedEOF, io.EOF:
parser.isLastChunk = true
Expand All @@ -95,6 +108,7 @@ func (parser *ChunkParser) readBlock() error {
parser.appendBuf.Write(parser.remainBuf.Bytes())
parser.appendBuf.Write(parser.blockBuf[:n])
parser.buf = parser.appendBuf.Bytes()
metric.ChunkParserReadBlockSecondsHistogram.Observe(time.Since(startTime).Seconds())
return nil
default:
return errors.Trace(err)
Expand Down
13 changes: 10 additions & 3 deletions lightning/mydump/parser_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package mydump_test

import (
"context"
"io"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/mydump"
"github.com/pingcap/tidb-lightning/lightning/worker"

"github.com/pkg/errors"
)

Expand All @@ -24,7 +28,8 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) {
"insert another_table values (10, 11, 12, '(13)', '(', 14, ')');",
)

parser := mydump.NewChunkParser(reader)
ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test")
parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -72,7 +77,8 @@ func (s *testMydumpParserSuite) TestReadChunks(c *C) {
INSERT foo VALUES (29,30,31,32),(33,34,35,36);
`)

parser := mydump.NewChunkParser(reader)
ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test")
parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers)

chunks, err := parser.ReadChunks(32)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -118,7 +124,8 @@ func (s *testMydumpParserSuite) TestNestedRow(c *C) {
("789",CONVERT("[]" USING UTF8MB4));
`)

parser := mydump.NewChunkParser(reader)
ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test")
parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers)
chunks, err := parser.ReadChunks(96)

c.Assert(err, IsNil)
Expand Down
70 changes: 19 additions & 51 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/pingcap/tidb-lightning/lightning/metric"
"github.com/pingcap/tidb-lightning/lightning/mydump"
verify "github.com/pingcap/tidb-lightning/lightning/verification"
"github.com/pingcap/tidb-lightning/lightning/worker"

tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/util/kvencoder"
Expand Down Expand Up @@ -94,8 +96,9 @@ type RestoreController struct {
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
dbInfos map[string]*TidbDBInfo
tableWorkers *RestoreWorkerPool
regionWorkers *RestoreWorkerPool
tableWorkers *worker.RestoreWorkerPool
regionWorkers *worker.RestoreWorkerPool
ioWorkers *worker.RestoreWorkerPool
importer *kv.Importer
tidbMgr *TiDBManager
postProcessLock sync.Mutex // a simple way to ensure post-processing is not concurrent without using complicated goroutines
Expand Down Expand Up @@ -128,8 +131,9 @@ func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta,
rc := &RestoreController{
cfg: cfg,
dbMetas: dbMetas,
tableWorkers: NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"),
regionWorkers: NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"),
tableWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"),
regionWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"),
ioWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.IOConcurrency, "io"),
importer: importer,
tidbMgr: tidbMgr,

Expand Down Expand Up @@ -438,9 +442,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
// Note: We still need tableWorkers to control the concurrency of tables. In the future, we will investigate more about
// the difference between restoring tables concurrently and restoring tables one by one.

worker := rc.tableWorkers.Apply()
restoreWorker := rc.tableWorkers.Apply()
wg.Add(1)
go func(w *RestoreWorker, t *TableRestore, cp *TableCheckpoint) {
go func(w *worker.RestoreWorker, t *TableRestore, cp *TableCheckpoint) {
defer wg.Done()

closedEngine, err := t.restore(ctx, rc, cp)
Expand All @@ -464,7 +468,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}

err = t.postProcess(ctx, closedEngine, rc, cp)
}(worker, tr, cp)
}(restoreWorker, tr, cp)
}
}

Expand Down Expand Up @@ -547,15 +551,15 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T
// 3. load kvs data (into kv deliver server)
// 4. flush kvs data (into tikv node)

cr, err := newChunkRestore(chunkIndex, chunk)
cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize, rc.ioWorkers)
if err != nil {
return nil, errors.Trace(err)
}
metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Inc()

worker := rc.regionWorkers.Apply()
restoreWorker := rc.regionWorkers.Apply()
wg.Add(1)
go func(w *RestoreWorker, cr *chunkRestore) {
go func(w *worker.RestoreWorker, cr *chunkRestore) {
// Restore a chunk.
defer func() {
cr.close()
Expand All @@ -580,7 +584,7 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T

handled := int(atomic.AddInt32(handledChunksCount, 1))
common.AppLogger.Infof("[%s] handled region count = %d (%s)", t.tableName, handled, common.Percent(handled, len(cp.Chunks)))
}(worker, cr)
}(restoreWorker, cr)
}

wg.Wait()
Expand Down Expand Up @@ -859,56 +863,18 @@ func (rc *RestoreController) getTables() []string {
return tables
}

////////////////////////////////////////////////////////////////

type RestoreWorkerPool struct {
limit int
workers chan *RestoreWorker
name string
}

type RestoreWorker struct {
ID int64
}

func NewRestoreWorkerPool(ctx context.Context, limit int, name string) *RestoreWorkerPool {
workers := make(chan *RestoreWorker, limit)
for i := 0; i < limit; i++ {
workers <- &RestoreWorker{ID: int64(i + 1)}
}

metric.IdleWorkersGauge.WithLabelValues(name).Set(float64(limit))
return &RestoreWorkerPool{
limit: limit,
workers: workers,
name: name,
}
}

func (pool *RestoreWorkerPool) Apply() *RestoreWorker {
worker := <-pool.workers
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
return worker
}
func (pool *RestoreWorkerPool) Recycle(worker *RestoreWorker) {
pool.workers <- worker
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
}

////////////////////////////////////////////////////////////////

type chunkRestore struct {
parser *mydump.ChunkParser
index int
chunk *ChunkCheckpoint
}

func newChunkRestore(index int, chunk *ChunkCheckpoint) (*chunkRestore, error) {
func newChunkRestore(index int, chunk *ChunkCheckpoint, blockBufSize int64, ioWorkers *worker.RestoreWorkerPool) (*chunkRestore, error) {
reader, err := os.Open(chunk.Key.Path)
if err != nil {
return nil, errors.Trace(err)
}
parser := mydump.NewChunkParser(reader)
parser := mydump.NewChunkParser(reader, blockBufSize, ioWorkers)

reader.Seek(chunk.Chunk.Offset, io.SeekStart)
parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax)
Expand Down Expand Up @@ -1354,6 +1320,7 @@ func (cr *chunkRestore) restore(
var sep byte = ' '
readLoop:
for cr.parser.Pos() < endOffset {
readRowStartTime := time.Now()
err := cr.parser.ReadRow()
switch errors.Cause(err) {
case nil:
Expand All @@ -1368,6 +1335,7 @@ func (cr *chunkRestore) restore(
buffer.WriteString(" VALUES ")
sep = ','
}
metric.ChunkParserReadRowSecondsHistogram.Observe(time.Since(readRowStartTime).Seconds())
lastRow := cr.parser.LastRow()
if cr.chunk.ShouldIncludeRowID {
buffer.Write(lastRow.Row[:len(lastRow.Row)-1])
Expand Down
49 changes: 49 additions & 0 deletions lightning/worker/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package worker
lonng marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"time"

"github.com/pingcap/tidb-lightning/lightning/metric"
)

type RestoreWorkerPool struct {
limit int
workers chan *RestoreWorker
name string
}

type RestoreWorker struct {
ID int64
}

func NewRestoreWorkerPool(ctx context.Context, limit int, name string) *RestoreWorkerPool {
workers := make(chan *RestoreWorker, limit)
for i := 0; i < limit; i++ {
workers <- &RestoreWorker{ID: int64(i + 1)}
}

metric.IdleWorkersGauge.WithLabelValues(name).Set(float64(limit))
return &RestoreWorkerPool{
limit: limit,
workers: workers,
name: name,
}
}

func (pool *RestoreWorkerPool) Apply() *RestoreWorker {
start := time.Now()
worker := <-pool.workers
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
metric.ApplyWorkerSecondsHistogram.WithLabelValues(pool.name).Observe(time.Since(start).Seconds())
return worker
}

func (pool *RestoreWorkerPool) Recycle(worker *RestoreWorker) {
pool.workers <- worker
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
}

func (pool *RestoreWorkerPool) HasWorker() bool {
return len(pool.workers) > 0
}
Loading