diff --git a/cmd/tidb-lightning-ctl/main.go b/cmd/tidb-lightning-ctl/main.go index e67747876..b547122e5 100644 --- a/cmd/tidb-lightning-ctl/main.go +++ b/cmd/tidb-lightning-ctl/main.go @@ -12,7 +12,6 @@ import ( "github.com/pingcap/tidb-lightning/lightning/kv" "github.com/pingcap/tidb-lightning/lightning/restore" "github.com/pkg/errors" - "github.com/satori/go.uuid" ) func main() { @@ -163,12 +162,15 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tableName s } for _, table := range targetTables { - if table.Engine == uuid.Nil { - continue - } - if closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, table.Engine); err == nil { - fmt.Fprintln(os.Stderr, "Cleaning up engine:", table.TableName, table.Engine) - closedEngine.Cleanup(ctx) + for engineID := 0; engineID < table.EnginesCount; engineID++ { + fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID) + closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, engineID) + if err != nil { + fmt.Fprintln(os.Stderr, "* Encountered error while closing engine:", err) + lastErr = err + } else { + closedEngine.Cleanup(ctx) + } } } @@ -193,6 +195,13 @@ func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string) } defer tablesFile.Close() + enginesFileName := path.Join(dumpFolder, "engines.csv") + enginesFile, err := os.Create(tablesFileName) + if err != nil { + return errors.Annotatef(err, "failed to create %s", enginesFileName) + } + defer enginesFile.Close() + chunksFileName := path.Join(dumpFolder, "chunks.csv") chunksFile, err := os.Create(chunksFileName) if err != nil { @@ -203,6 +212,9 @@ func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string) if err := cpdb.DumpTables(ctx, tablesFile); err != nil { return errors.Trace(err) } + if err := cpdb.DumpEngines(ctx, enginesFile); err != nil { + return errors.Trace(err) + } if err := cpdb.DumpChunks(ctx, chunksFile); err != nil { return errors.Trace(err) } diff --git a/lightning/common/once_error.go b/lightning/common/once_error.go new file mode 100644 index 000000000..50930f8ce --- /dev/null +++ b/lightning/common/once_error.go @@ -0,0 +1,36 @@ +package common + +import ( + "sync" +) + +// OnceError is an error value which will can be assigned once. +// +// The zero value is ready for use. +type OnceError struct { + lock sync.Mutex + err error +} + +// Set assigns an error to this instance, if `e != nil`. +// +// If this method is called multiple times, only the first call is effective. +func (oe *OnceError) Set(tag string, e error) { + if e != nil { + oe.lock.Lock() + if oe.err == nil { + oe.err = e + } + oe.lock.Unlock() + if !IsContextCanceledError(e) { + AppLogger.Errorf("[%s] error %v", tag, e) + } + } +} + +// Get returns the first error value stored in this instance. +func (oe *OnceError) Get() error { + oe.lock.Lock() + defer oe.lock.Unlock() + return oe.err +} diff --git a/lightning/common/once_error_test.go b/lightning/common/once_error_test.go new file mode 100644 index 000000000..c5c48a405 --- /dev/null +++ b/lightning/common/once_error_test.go @@ -0,0 +1,45 @@ +package common_test + +import ( + "errors" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb-lightning/lightning/common" +) + +func TestCommon(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&onceErrorSuite{}) + +type onceErrorSuite struct{} + +func (s *onceErrorSuite) TestOnceError(c *C) { + var err common.OnceError + + c.Assert(err.Get(), IsNil) + + err.Set("tag", nil) + c.Assert(err.Get(), IsNil) + + e := errors.New("1") + err.Set("tag", e) + c.Assert(err.Get(), Equals, e) + + e2 := errors.New("2") + err.Set("tag", e2) + c.Assert(err.Get(), Equals, e) // e, not e2. + + err.Set("tag", nil) + c.Assert(err.Get(), Equals, e) + + ch := make(chan struct{}) + go func() { + err.Set("tag", nil) + ch <- struct{}{} + }() + <-ch + c.Assert(err.Get(), Equals, e) +} diff --git a/lightning/config/config.go b/lightning/config/config.go index ba1bd4447..ca4fbba5e 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -84,16 +84,16 @@ type PostRestore struct { } type MydumperRuntime struct { - ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"` - MinRegionSize int64 `toml:"region-min-size" json:"region-min-size"` - SourceDir string `toml:"data-source-dir" json:"data-source-dir"` - NoSchema bool `toml:"no-schema" json:"no-schema"` - CharacterSet string `toml:"character-set" json:"character-set"` + ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"` + BatchSize int64 `toml:"batch-size" json:"batch-size"` + BatchImportRatio float64 `toml:"batch-import-ratio" json:"batch-import-ratio"` + SourceDir string `toml:"data-source-dir" json:"data-source-dir"` + NoSchema bool `toml:"no-schema" json:"no-schema"` + CharacterSet string `toml:"character-set" json:"character-set"` } type TikvImporter struct { - Addr string `toml:"addr" json:"addr"` - BatchSize int64 `toml:"batch-size" json:"batch-size"` + Addr string `toml:"addr" json:"addr"` } type Checkpoint struct { @@ -187,8 +187,11 @@ func (cfg *Config) Load() error { } // handle mydumper - if cfg.Mydumper.MinRegionSize <= 0 { - cfg.Mydumper.MinRegionSize = MinRegionSize + if cfg.Mydumper.BatchSize <= 0 { + cfg.Mydumper.BatchSize = 100 * _G + } + if cfg.Mydumper.BatchImportRatio < 0.0 || cfg.Mydumper.BatchImportRatio >= 1.0 { + cfg.Mydumper.BatchImportRatio = 0.75 } if cfg.Mydumper.ReadBlockSize <= 0 { cfg.Mydumper.ReadBlockSize = ReadBlockSize @@ -197,11 +200,6 @@ func (cfg *Config) Load() error { cfg.Mydumper.CharacterSet = "auto" } - // hendle kv import - if cfg.TikvImporter.BatchSize <= 0 { - cfg.TikvImporter.BatchSize = KVMaxBatchSize - } - if len(cfg.Checkpoint.Schema) == 0 { cfg.Checkpoint.Schema = "tidb_lightning_checkpoint" } diff --git a/lightning/config/const.go b/lightning/config/const.go index f7769a771..48592282c 100644 --- a/lightning/config/const.go +++ b/lightning/config/const.go @@ -10,7 +10,4 @@ const ( MinRegionSize int64 = 256 * _M BufferSizeScale = 5 - - // kv import - KVMaxBatchSize int64 = 200 * _G ) diff --git a/lightning/kv/importer.go b/lightning/kv/importer.go index dbf25b31d..460596e8c 100644 --- a/lightning/kv/importer.go +++ b/lightning/kv/importer.go @@ -31,19 +31,23 @@ Usual workflow: 2. For each table, - a. Create an `OpenedEngine` via `importer.OpenEngine()` + i. Split into multiple "batches" consisting of data files with roughly equal total size. - b. For each chunk, + ii. For each batch, - i. Create a `WriteStream` via `engine.NewWriteStream()` - ii. Deliver data into the stream via `stream.Put()` - iii. Close the stream via `stream.Close()` + a. Create an `OpenedEngine` via `importer.OpenEngine()` - c. When all chunks are written, obtain a `ClosedEngine` via `engine.CloseEngine()` + b. For each chunk, - d. Import data via `engine.Import()` + i. Create a `WriteStream` via `engine.NewWriteStream()` + ii. Deliver data into the stream via `stream.Put()` + iii. Close the stream via `stream.Close()` - e. Cleanup via `engine.Cleanup()` + c. When all chunks are written, obtain a `ClosedEngine` via `engine.CloseEngine()` + + d. Import data via `engine.Import()` + + e. Cleanup via `engine.Cleanup()` 3. Close the connection via `importer.Close()` @@ -120,10 +124,10 @@ func (importer *Importer) Compact(ctx context.Context, level int32) error { // OpenedEngine is an opened importer engine file, allowing data to be written // to it via WriteStream instances. type OpenedEngine struct { - importer *Importer - tableName string - uuid uuid.UUID - ts uint64 + importer *Importer + tag string + uuid uuid.UUID + ts uint64 } // isIgnorableOpenCloseEngineError checks if the error from @@ -138,13 +142,21 @@ func isIgnorableOpenCloseEngineError(err error) bool { return err == nil || strings.Contains(err.Error(), "FileExists") } -// OpenEngine opens an engine with the given UUID. This type is goroutine safe: -// you can share this instance and execute any method anywhere. +func makeTag(tableName string, engineID int) string { + return fmt.Sprintf("%s:%d", tableName, engineID) +} + +var engineNamespace = uuid.Must(uuid.FromString("d68d6abe-c59e-45d6-ade8-e2b0ceb7bedf")) + +// OpenEngine opens an engine with the given table name and engine ID. This type +// is goroutine safe: you can share this instance and execute any method anywhere. func (importer *Importer) OpenEngine( ctx context.Context, tableName string, - engineUUID uuid.UUID, + engineID int, ) (*OpenedEngine, error) { + tag := makeTag(tableName, engineID) + engineUUID := uuid.NewV5(engineNamespace, tag) req := &kv.OpenEngineRequest{ Uuid: engineUUID.Bytes(), } @@ -155,7 +167,7 @@ func (importer *Importer) OpenEngine( openCounter := metric.EngineCounter.WithLabelValues("open") openCounter.Inc() - common.AppLogger.Infof("[%s] open engine %s", tableName, engineUUID) + common.AppLogger.Infof("[%s] open engine %s", tag, engineUUID) // gofail: var FailIfEngineCountExceeds int // { @@ -171,10 +183,10 @@ func (importer *Importer) OpenEngine( // gofail: RETURN: return &OpenedEngine{ - importer: importer, - tableName: tableName, - ts: uint64(time.Now().Unix()), // TODO ... set outside ? from pd ? - uuid: engineUUID, + importer: importer, + tag: tag, + ts: uint64(time.Now().Unix()), // TODO ... set outside ? from pd ? + uuid: engineUUID, }, nil } @@ -204,7 +216,7 @@ func (engine *OpenedEngine) NewWriteStream(ctx context.Context) (*WriteStream, e if err = wstream.Send(req); err != nil { if _, closeErr := wstream.CloseAndRecv(); closeErr != nil { // just log the close error, we need to propagate the send error instead - common.AppLogger.Warnf("[%s] close write stream cause failed : %v", engine.tableName, closeErr) + common.AppLogger.Warnf("[%s] close write stream cause failed : %v", engine.tag, closeErr) } return nil, errors.Trace(err) } @@ -242,7 +254,7 @@ func (stream *WriteStream) Put(kvs []kvec.KvPair) error { if !common.IsRetryableError(sendErr) { break } - common.AppLogger.Errorf("[%s] write stream failed to send: %s", stream.engine.tableName, sendErr.Error()) + common.AppLogger.Errorf("[%s] write stream failed to send: %s", stream.engine.tag, sendErr.Error()) time.Sleep(retryBackoffTime) } return errors.Trace(sendErr) @@ -252,7 +264,7 @@ func (stream *WriteStream) Put(kvs []kvec.KvPair) error { func (stream *WriteStream) Close() error { if _, err := stream.wstream.CloseAndRecv(); err != nil { if !common.IsContextCanceledError(err) { - common.AppLogger.Errorf("[%s] close write stream cause failed : %v", stream.engine.tableName, err) + common.AppLogger.Errorf("[%s] close write stream cause failed : %v", stream.engine.tag, err) } return errors.Trace(err) } @@ -263,21 +275,21 @@ func (stream *WriteStream) Close() error { // This type is goroutine safe: you can share this instance and execute any // method anywhere. type ClosedEngine struct { - importer *Importer - tableName string - uuid uuid.UUID + importer *Importer + tag string + uuid uuid.UUID } // Close the opened engine to prepare it for importing. This method will return // error if any associated WriteStream is still not closed. func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) { - common.AppLogger.Infof("[%s] [%s] engine close", engine.tableName, engine.uuid) + common.AppLogger.Infof("[%s] [%s] engine close", engine.tag, engine.uuid) timer := time.Now() - closedEngine, err := engine.importer.UnsafeCloseEngine(ctx, engine.tableName, engine.uuid) + closedEngine, err := engine.importer.unsafeCloseEngine(ctx, engine.tag, engine.uuid) if err != nil { return nil, errors.Trace(err) } - common.AppLogger.Infof("[%s] [%s] engine close takes %v", engine.tableName, engine.uuid, time.Since(timer)) + common.AppLogger.Infof("[%s] [%s] engine close takes %v", engine.tag, engine.uuid, time.Since(timer)) metric.EngineCounter.WithLabelValues("closed").Inc() return closedEngine, nil } @@ -287,7 +299,13 @@ func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) { // (Open -> Write -> Close -> Import). This method should only be used when one // knows via other ways that the engine has already been opened, e.g. when // resuming from a checkpoint. -func (importer *Importer) UnsafeCloseEngine(ctx context.Context, tableName string, engineUUID uuid.UUID) (*ClosedEngine, error) { +func (importer *Importer) UnsafeCloseEngine(ctx context.Context, tableName string, engineID int) (*ClosedEngine, error) { + tag := makeTag(tableName, engineID) + engineUUID := uuid.NewV5(engineNamespace, tag) + return importer.unsafeCloseEngine(ctx, tag, engineUUID) +} + +func (importer *Importer) unsafeCloseEngine(ctx context.Context, tag string, engineUUID uuid.UUID) (*ClosedEngine, error) { req := &kv.CloseEngineRequest{ Uuid: engineUUID.Bytes(), } @@ -297,9 +315,9 @@ func (importer *Importer) UnsafeCloseEngine(ctx context.Context, tableName strin } return &ClosedEngine{ - importer: importer, - tableName: tableName, - uuid: engineUUID, + importer: importer, + tag: tag, + uuid: engineUUID, }, nil } @@ -308,7 +326,7 @@ func (engine *ClosedEngine) Import(ctx context.Context) error { var err error for i := 0; i < maxRetryTimes; i++ { - common.AppLogger.Infof("[%s] [%s] import", engine.tableName, engine.uuid) + common.AppLogger.Infof("[%s] [%s] import", engine.tag, engine.uuid) req := &kv.ImportEngineRequest{ Uuid: engine.uuid.Bytes(), PdAddr: engine.importer.pdAddr, @@ -316,24 +334,28 @@ func (engine *ClosedEngine) Import(ctx context.Context) error { timer := time.Now() _, err = engine.importer.cli.ImportEngine(ctx, req) if !common.IsRetryableError(err) { - common.AppLogger.Infof("[%s] [%s] import takes %v", engine.tableName, engine.uuid, time.Since(timer)) + if err == nil { + common.AppLogger.Infof("[%s] [%s] import takes %v", engine.tag, engine.uuid, time.Since(timer)) + } else if !common.IsContextCanceledError(err) { + common.AppLogger.Errorf("[%s] [%s] import failed and cannot retry, err %v", engine.tag, engine.uuid, err) + } return errors.Trace(err) } - common.AppLogger.Warnf("[%s] [%s] import failed and retry %d time, err %v", engine.tableName, engine.uuid, i+1, err) + common.AppLogger.Warnf("[%s] [%s] import failed and retry %d time, err %v", engine.tag, engine.uuid, i+1, err) time.Sleep(retryBackoffTime) } - return errors.Annotatef(err, "[%s] [%s] import reach max retry %d and still failed", engine.tableName, engine.uuid, maxRetryTimes) + return errors.Annotatef(err, "[%s] [%s] import reach max retry %d and still failed", engine.tag, engine.uuid, maxRetryTimes) } // Cleanup deletes the imported data from importer. func (engine *ClosedEngine) Cleanup(ctx context.Context) error { - common.AppLogger.Infof("[%s] [%s] cleanup ", engine.tableName, engine.uuid) + common.AppLogger.Infof("[%s] [%s] cleanup ", engine.tag, engine.uuid) req := &kv.CleanupEngineRequest{ Uuid: engine.uuid.Bytes(), } timer := time.Now() _, err := engine.importer.cli.CleanupEngine(ctx, req) - common.AppLogger.Infof("[%s] [%s] cleanup takes %v", engine.tableName, engine.uuid, time.Since(timer)) + common.AppLogger.Infof("[%s] [%s] cleanup takes %v", engine.tag, engine.uuid, time.Since(timer)) return errors.Trace(err) } diff --git a/lightning/mydump/region.go b/lightning/mydump/region.go index 7e0139065..78aba94d6 100644 --- a/lightning/mydump/region.go +++ b/lightning/mydump/region.go @@ -1,14 +1,14 @@ package mydump import ( - "fmt" + "math" "os" "github.com/pkg/errors" ) type TableRegion struct { - ID int + EngineID int DB string Table string @@ -17,11 +17,6 @@ type TableRegion struct { Chunk Chunk } -func (reg *TableRegion) Name() string { - return fmt.Sprintf("%s|%s|%d|%d", - reg.DB, reg.Table, reg.ID, reg.Chunk.Offset) -} - func (reg *TableRegion) RowIDMin() int64 { return reg.Chunk.PrevRowIDMax + 1 } @@ -52,12 +47,90 @@ func (rs regionSlice) Less(i, j int) bool { //////////////////////////////////////////////////////////////// -func MakeTableRegions(meta *MDTableMeta, columns int) ([]*TableRegion, error) { +func AllocateEngineIDs( + filesRegions []*TableRegion, + dataFileSizes []float64, + batchSize float64, + batchImportRatio float64, + tableConcurrency float64, +) { + totalDataFileSize := 0.0 + for _, dataFileSize := range dataFileSizes { + totalDataFileSize += dataFileSize + } + + // No need to batch if the size is too small :) + if totalDataFileSize <= batchSize { + return + } + + curEngineID := 0 + curEngineSize := 0.0 + curBatchSize := batchSize + + // import() step will not be concurrent. + // If multiple Batch end times are close, it will result in multiple + // Batch import serials. We need use a non-uniform batch size to create a pipeline effect. + // Here we calculate the total number of engines, which is needed to compute the scale up + // + // Total/B1 = 1/(1-R) * (N - 1/beta(N, R)) + // ≲ N/(1-R) + // + // We use a simple brute force search since the search space is extremely small. + ratio := totalDataFileSize * (1 - batchImportRatio) / batchSize + n := math.Ceil(ratio) + logGammaNPlusR, _ := math.Lgamma(n + batchImportRatio) + logGammaN, _ := math.Lgamma(n) + logGammaR, _ := math.Lgamma(batchImportRatio) + invBetaNR := math.Exp(logGammaNPlusR - logGammaN - logGammaR) // 1/B(N, R) = Γ(N+R)/Γ(N)Γ(R) + for { + if n <= 0 || n > tableConcurrency { + n = tableConcurrency + break + } + realRatio := n - invBetaNR + if realRatio >= ratio { + // we don't have enough engines. reduce the batch size to keep the pipeline smooth. + curBatchSize = totalDataFileSize * (1 - batchImportRatio) / realRatio + break + } + invBetaNR *= 1 + batchImportRatio/n // Γ(X+1) = X * Γ(X) + n += 1.0 + } + + for i, dataFileSize := range dataFileSizes { + filesRegions[i].EngineID = curEngineID + curEngineSize += dataFileSize + + if curEngineSize >= curBatchSize { + curEngineSize = 0 + curEngineID++ + + i := float64(curEngineID) + // calculate the non-uniform batch size + if i >= n { + curBatchSize = batchSize + } else { + // B_(i+1) = B_i * (I/W/(N-i) + 1) + curBatchSize *= batchImportRatio/(n-i) + 1.0 + } + } + } +} + +func MakeTableRegions( + meta *MDTableMeta, + columns int, + batchSize int64, + batchImportRatio float64, + tableConcurrency int, +) ([]*TableRegion, error) { // Split files into regions filesRegions := make(regionSlice, 0, len(meta.DataFiles)) + dataFileSizes := make([]float64, 0, len(meta.DataFiles)) prevRowIDMax := int64(0) - for i, dataFile := range meta.DataFiles { + for _, dataFile := range meta.DataFiles { dataFileInfo, err := os.Stat(dataFile) if err != nil { return nil, errors.Annotatef(err, "cannot stat %s", dataFile) @@ -65,7 +138,6 @@ func MakeTableRegions(meta *MDTableMeta, columns int) ([]*TableRegion, error) { dataFileSize := dataFileInfo.Size() rowIDMax := prevRowIDMax + dataFileSize/(int64(columns)+2) filesRegions = append(filesRegions, &TableRegion{ - ID: i, DB: meta.DB, Table: meta.Name, File: dataFile, @@ -77,7 +149,9 @@ func MakeTableRegions(meta *MDTableMeta, columns int) ([]*TableRegion, error) { }, }) prevRowIDMax = rowIDMax + dataFileSizes = append(dataFileSizes, float64(dataFileSize)) } + AllocateEngineIDs(filesRegions, dataFileSizes, float64(batchSize), batchImportRatio, float64(tableConcurrency)) return filesRegions, nil } diff --git a/lightning/mydump/region_test.go b/lightning/mydump/region_test.go index a90b8a1da..274c68001 100644 --- a/lightning/mydump/region_test.go +++ b/lightning/mydump/region_test.go @@ -38,7 +38,7 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) { dbMeta := loader.GetDatabases()[0] for _, meta := range dbMeta.Tables { - regions, err := MakeTableRegions(meta, 1) + regions, err := MakeTableRegions(meta, 1, 1, 0, 1) c.Assert(err, IsNil) table := meta.Name @@ -98,7 +98,7 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) { dbMeta := loader.GetDatabases()[0] for _, meta := range dbMeta.Tables { - regions, err := MakeTableRegions(meta, 1) + regions, err := MakeTableRegions(meta, 1, 1, 0, 1) c.Assert(err, IsNil) tolValTuples := 0 @@ -116,3 +116,84 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) { return } + +func (s *testMydumpRegionSuite) TestAllocateEngineIDs(c *C) { + dataFileSizes := make([]float64, 700) + for i := range dataFileSizes { + dataFileSizes[i] = 1.0 + } + filesRegions := make([]*TableRegion, 0, len(dataFileSizes)) + for range dataFileSizes { + filesRegions = append(filesRegions, new(TableRegion)) + } + + checkEngineSizes := func(what string, expected map[int]int) { + actual := make(map[int]int) + for _, region := range filesRegions { + actual[region.EngineID]++ + } + c.Assert(actual, DeepEquals, expected, Commentf("%s", what)) + } + + // Batch size > Total size => Everything in the zero batch. + AllocateEngineIDs(filesRegions, dataFileSizes, 1000, 0.5, 1000) + checkEngineSizes("no batching", map[int]int{ + 0: 700, + }) + + // Allocate 3 engines. + AllocateEngineIDs(filesRegions, dataFileSizes, 200, 0.5, 1000) + checkEngineSizes("batch size = 200", map[int]int{ + 0: 170, + 1: 213, + 2: 317, + }) + + // Allocate 3 engines with an alternative ratio + AllocateEngineIDs(filesRegions, dataFileSizes, 200, 0.6, 1000) + checkEngineSizes("batch size = 200, ratio = 0.6", map[int]int{ + 0: 160, + 1: 208, + 2: 332, + }) + + // Allocate 5 engines. + AllocateEngineIDs(filesRegions, dataFileSizes, 100, 0.5, 1000) + checkEngineSizes("batch size = 100", map[int]int{ + 0: 93, + 1: 105, + 2: 122, + 3: 153, + 4: 227, + }) + + // Number of engines > table concurrency + AllocateEngineIDs(filesRegions, dataFileSizes, 50, 0.5, 4) + checkEngineSizes("batch size = 50, limit table conc = 4", map[int]int{ + 0: 50, + 1: 59, + 2: 73, + 3: 110, + 4: 50, + 5: 50, + 6: 50, + 7: 50, + 8: 50, + 9: 50, + 10: 50, + 11: 50, + 12: 8, + }) + + // Zero ratio = Uniform + AllocateEngineIDs(filesRegions, dataFileSizes, 100, 0.0, 1000) + checkEngineSizes("batch size = 100, ratio = 0", map[int]int{ + 0: 100, + 1: 100, + 2: 100, + 3: 100, + 4: 100, + 5: 100, + 6: 100, + }) +} diff --git a/lightning/restore/checkpoints.go b/lightning/restore/checkpoints.go index 82dc8572d..ac0d8190b 100644 --- a/lightning/restore/checkpoints.go +++ b/lightning/restore/checkpoints.go @@ -14,7 +14,6 @@ import ( "github.com/cznic/mathutil" "github.com/joho/sqltocsv" "github.com/pkg/errors" - "github.com/satori/go.uuid" "github.com/pingcap/tidb-lightning/lightning/common" "github.com/pingcap/tidb-lightning/lightning/mydump" @@ -43,8 +42,9 @@ const nodeID = 0 const ( // the table names to store each kind of checkpoint in the checkpoint database // remember to increase the version number in case of incompatible change. - checkpointTableNameTable = "table_v1" - checkpointTableNameChunk = "chunk_v3" + checkpointTableNameTable = "table_v4" + checkpointTableNameEngine = "engine_v4" + checkpointTableNameChunk = "chunk_v4" ) func (status CheckpointStatus) MetricName() string { @@ -96,15 +96,23 @@ type ChunkCheckpoint struct { Checksum verify.KVChecksum } +type EngineCheckpoint struct { + Status CheckpointStatus + Chunks []*ChunkCheckpoint // a sorted array +} + type TableCheckpoint struct { Status CheckpointStatus - Engine uuid.UUID AllocBase int64 - Chunks []*ChunkCheckpoint // a sorted array + Engines []*EngineCheckpoint } -func (cp *TableCheckpoint) resetChunks() { - cp.Chunks = nil +func (cp *TableCheckpoint) CountChunks() int { + result := 0 + for _, engine := range cp.Engines { + result += len(engine.Chunks) + } + return result } type chunkCheckpointDiff struct { @@ -115,25 +123,44 @@ type chunkCheckpointDiff struct { checksum verify.KVChecksum } +type engineCheckpointDiff struct { + hasStatus bool + status CheckpointStatus + chunks map[ChunkCheckpointKey]chunkCheckpointDiff +} + type TableCheckpointDiff struct { hasStatus bool hasRebase bool status CheckpointStatus allocBase int64 - chunks map[ChunkCheckpointKey]chunkCheckpointDiff + engines map[int]engineCheckpointDiff } func NewTableCheckpointDiff() *TableCheckpointDiff { return &TableCheckpointDiff{ - status: CheckpointStatusMaxInvalid + 1, - chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff), + engines: make(map[int]engineCheckpointDiff), + } +} + +func (cpd *TableCheckpointDiff) insertEngineCheckpointDiff(engineID int, newDiff engineCheckpointDiff) { + if oldDiff, ok := cpd.engines[engineID]; ok { + if newDiff.hasStatus { + oldDiff.hasStatus = true + oldDiff.status = newDiff.status + } + for key, chunkDiff := range newDiff.chunks { + oldDiff.chunks[key] = chunkDiff + } + newDiff = oldDiff } + cpd.engines[engineID] = newDiff } func (cpd *TableCheckpointDiff) String() string { return fmt.Sprintf( - "{hasStatus:%v, hasRebase:%v, status:%d, allocBase:%d, chunks:[%d]}", - cpd.hasStatus, cpd.hasRebase, cpd.status, cpd.allocBase, len(cpd.chunks), + "{hasStatus:%v, hasRebase:%v, status:%d, allocBase:%d, engines:[%d]}", + cpd.hasStatus, cpd.hasRebase, cpd.status, cpd.allocBase, len(cpd.engines), ) } @@ -147,7 +174,8 @@ type TableCheckpointMerger interface { } type StatusCheckpointMerger struct { - Status CheckpointStatus + EngineID int // -1 == apply to whole table. + Status CheckpointStatus } func (merger *StatusCheckpointMerger) SetInvalid() { @@ -155,11 +183,21 @@ func (merger *StatusCheckpointMerger) SetInvalid() { } func (merger *StatusCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) { - cpd.status = merger.Status - cpd.hasStatus = true + if merger.EngineID == -1 || merger.Status <= CheckpointStatusMaxInvalid { + cpd.status = merger.Status + cpd.hasStatus = true + } + if merger.EngineID >= 0 { + cpd.insertEngineCheckpointDiff(merger.EngineID, engineCheckpointDiff{ + hasStatus: true, + status: merger.Status, + chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff), + }) + } } type ChunkCheckpointMerger struct { + EngineID int Key ChunkCheckpointKey Checksum verify.KVChecksum Pos int64 @@ -167,11 +205,15 @@ type ChunkCheckpointMerger struct { } func (merger *ChunkCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) { - cpd.chunks[merger.Key] = chunkCheckpointDiff{ - pos: merger.Pos, - rowID: merger.RowID, - checksum: merger.Checksum, - } + cpd.insertEngineCheckpointDiff(merger.EngineID, engineCheckpointDiff{ + chunks: map[ChunkCheckpointKey]chunkCheckpointDiff{ + merger.Key: { + pos: merger.Pos, + rowID: merger.RowID, + checksum: merger.Checksum, + }, + }, + }) } type RebaseCheckpointMerger struct { @@ -184,21 +226,22 @@ func (merger *RebaseCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) { } type DestroyedTableCheckpoint struct { - TableName string - Engine uuid.UUID + TableName string + EnginesCount int } type CheckpointsDB interface { Initialize(ctx context.Context, dbInfo map[string]*TidbDBInfo) error Get(ctx context.Context, tableName string) (*TableCheckpoint, error) Close() error - InsertChunkCheckpoints(ctx context.Context, tableName string, checkpoints []*ChunkCheckpoint) error + InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints []*EngineCheckpoint) error Update(checkpointDiffs map[string]*TableCheckpointDiff) RemoveCheckpoint(ctx context.Context, tableName string) error IgnoreErrorCheckpoint(ctx context.Context, tableName string) error DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error) DumpTables(ctx context.Context, csv io.Writer) error + DumpEngines(ctx context.Context, csv io.Writer) error DumpChunks(ctx context.Context, csv io.Writer) error } @@ -216,14 +259,13 @@ func (*NullCheckpointsDB) Close() error { return nil } -func (*NullCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error) { +func (*NullCheckpointsDB) Get(_ context.Context, _ string) (*TableCheckpoint, error) { return &TableCheckpoint{ Status: CheckpointStatusLoaded, - Engine: uuid.NewV4(), }, nil } -func (*NullCheckpointsDB) InsertChunkCheckpoints(_ context.Context, _ string, _ []*ChunkCheckpoint) error { +func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ []*EngineCheckpoint) error { return nil } @@ -253,7 +295,6 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) ( session bigint unsigned NOT NULL, table_name varchar(261) NOT NULL PRIMARY KEY, hash binary(32) NOT NULL, - engine binary(16) NOT NULL, status tinyint unsigned DEFAULT 30, alloc_base bigint NOT NULL DEFAULT 0, create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -265,9 +306,24 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) ( return nil, errors.Trace(err) } + err = common.ExecWithRetry(ctx, db, "(create engine checkpoints table)", fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ( + table_name varchar(261) NOT NULL, + engine_id int unsigned NOT NULL, + status tinyint unsigned DEFAULT 30, + create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY(table_name, engine_id DESC) + ); + `, schema, checkpointTableNameEngine)) + if err != nil { + return nil, errors.Trace(err) + } + err = common.ExecWithRetry(ctx, db, "(create chunks checkpoints table)", fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s.%s ( table_name varchar(261) NOT NULL, + engine_id int unsigned NOT NULL, path varchar(2048) NOT NULL, offset bigint NOT NULL, columns text NULL, @@ -281,7 +337,7 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) ( kvc_checksum bigint unsigned NOT NULL DEFAULT 0, create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY(table_name, path(500), offset) + PRIMARY KEY(table_name, engine_id, path(500), offset) ); `, schema, checkpointTableNameChunk)) if err != nil { @@ -310,7 +366,7 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, dbInfo map[strin // We do need to capture the error is display a user friendly message // (multiple nodes cannot import the same table) though. stmt, err := tx.PrepareContext(c, fmt.Sprintf(` - INSERT INTO %s.%s (node_id, session, table_name, hash, engine) VALUES (?, ?, ?, ?, ?) + INSERT INTO %s.%s (node_id, session, table_name, hash) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE session = CASE WHEN node_id = VALUES(node_id) AND hash = VALUES(hash) THEN VALUES(session) @@ -324,7 +380,7 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, dbInfo map[strin for _, db := range dbInfo { for _, table := range db.Tables { tableName := common.UniqueTable(db.Name, table.Name) - _, err = stmt.ExecContext(c, nodeID, cpdb.session, tableName, 0, uuid.NewV4().Bytes()) + _, err = stmt.ExecContext(c, nodeID, cpdb.session, tableName, 0) if err != nil { return errors.Trace(err) } @@ -346,60 +402,82 @@ func (cpdb *MySQLCheckpointsDB) Close() error { func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error) { cp := new(TableCheckpoint) - cp.resetChunks() purpose := "(read checkpoint " + tableName + ")" err := common.TransactWithRetry(ctx, cpdb.db, purpose, func(c context.Context, tx *sql.Tx) error { - query := fmt.Sprintf(` + // 1. Populate the engines. + + engineQuery := fmt.Sprintf(` + SELECT engine_id, status FROM %s.%s WHERE table_name = ? ORDER BY engine_id DESC; + `, cpdb.schema, checkpointTableNameEngine) + engineRows, err := tx.QueryContext(c, engineQuery, tableName) + if err != nil { + return errors.Trace(err) + } + defer engineRows.Close() + for engineRows.Next() { + var ( + engineID int + status uint8 + ) + if err := engineRows.Scan(&engineID, &status); err != nil { + return errors.Trace(err) + } + for len(cp.Engines) <= engineID { + cp.Engines = append(cp.Engines, new(EngineCheckpoint)) + } + cp.Engines[engineID].Status = CheckpointStatus(status) + } + if err := engineRows.Err(); err != nil { + return errors.Trace(err) + } + + // 2. Populate the chunks. + + chunkQuery := fmt.Sprintf(` SELECT - path, offset, columns, should_include_row_id, + engine_id, path, offset, columns, should_include_row_id, pos, end_offset, prev_rowid_max, rowid_max, kvc_bytes, kvc_kvs, kvc_checksum FROM %s.%s WHERE table_name = ? - ORDER BY path, offset; + ORDER BY engine_id, path, offset; `, cpdb.schema, checkpointTableNameChunk) - rows, err := tx.QueryContext(c, query, tableName) + chunkRows, err := tx.QueryContext(c, chunkQuery, tableName) if err != nil { return errors.Trace(err) } - defer rows.Close() - for rows.Next() { + defer chunkRows.Close() + for chunkRows.Next() { var ( value = new(ChunkCheckpoint) + engineID int kvcBytes uint64 kvcKVs uint64 kvcChecksum uint64 ) - if err := rows.Scan( - &value.Key.Path, &value.Key.Offset, &value.Columns, &value.ShouldIncludeRowID, + if err := chunkRows.Scan( + &engineID, &value.Key.Path, &value.Key.Offset, &value.Columns, &value.ShouldIncludeRowID, &value.Chunk.Offset, &value.Chunk.EndOffset, &value.Chunk.PrevRowIDMax, &value.Chunk.RowIDMax, &kvcBytes, &kvcKVs, &kvcChecksum, ); err != nil { return errors.Trace(err) } value.Checksum = verify.MakeKVChecksum(kvcBytes, kvcKVs, kvcChecksum) - cp.Chunks = append(cp.Chunks, value) + cp.Engines[engineID].Chunks = append(cp.Engines[engineID].Chunks, value) } - if err := rows.Err(); err != nil { + if err := chunkRows.Err(); err != nil { return errors.Trace(err) } - query = fmt.Sprintf(` - SELECT status, engine, alloc_base FROM %s.%s WHERE table_name = ? + // 3. Fill in the remaining table info + + tableQuery := fmt.Sprintf(` + SELECT status, alloc_base FROM %s.%s WHERE table_name = ? `, cpdb.schema, checkpointTableNameTable) - row := tx.QueryRowContext(c, query, tableName) - - var ( - status uint8 - engine []byte - ) - if err := row.Scan(&status, &engine, &cp.AllocBase); err != nil { - cp.resetChunks() - return errors.Trace(err) - } - cp.Engine, err = uuid.FromBytes(engine) - if err != nil { - cp.resetChunks() + tableRow := tx.QueryRowContext(c, tableQuery, tableName) + + var status uint8 + if err := tableRow.Scan(&status, &cp.AllocBase); err != nil { return errors.Trace(err) } cp.Status = CheckpointStatus(status) @@ -409,22 +487,28 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab return nil, errors.Trace(err) } - if cp.Status <= CheckpointStatusMaxInvalid { - return nil, errors.Errorf("Checkpoint for %s has invalid status: %d", tableName, cp.Status) - } - return cp, nil } -func (cpdb *MySQLCheckpointsDB) InsertChunkCheckpoints(ctx context.Context, tableName string, checkpoints []*ChunkCheckpoint) error { - err := common.TransactWithRetry(ctx, cpdb.db, "(update chunk checkpoints for "+tableName+")", func(c context.Context, tx *sql.Tx) error { - stmt, err := tx.PrepareContext(c, fmt.Sprintf(` +func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints []*EngineCheckpoint) error { + err := common.TransactWithRetry(ctx, cpdb.db, "(update engine checkpoints for "+tableName+")", func(c context.Context, tx *sql.Tx) error { + engineStmt, err := tx.PrepareContext(c, fmt.Sprintf(` + REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?); + `, cpdb.schema, checkpointTableNameEngine)) + if err != nil { + return errors.Trace(err) + } + defer engineStmt.Close() + + chunkStmt, err := tx.PrepareContext(c, fmt.Sprintf(` REPLACE INTO %s.%s ( - table_name, path, offset, columns, should_include_row_id, + table_name, engine_id, + path, offset, columns, should_include_row_id, pos, end_offset, prev_rowid_max, rowid_max, kvc_bytes, kvc_kvs, kvc_checksum ) VALUES ( - ?, ?, ?, ?, ?, + ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ); @@ -432,18 +516,24 @@ func (cpdb *MySQLCheckpointsDB) InsertChunkCheckpoints(ctx context.Context, tabl if err != nil { return errors.Trace(err) } - defer stmt.Close() + defer chunkStmt.Close() - for _, value := range checkpoints { - _, err = stmt.ExecContext( - c, - tableName, value.Key.Path, value.Key.Offset, value.Columns, value.ShouldIncludeRowID, - value.Chunk.Offset, value.Chunk.EndOffset, value.Chunk.PrevRowIDMax, value.Chunk.RowIDMax, - value.Checksum.SumSize(), value.Checksum.SumKVS(), value.Checksum.Sum(), - ) + for engineID, engine := range checkpoints { + _, err = engineStmt.ExecContext(c, tableName, engineID, engine.Status) if err != nil { return errors.Trace(err) } + for _, value := range engine.Chunks { + _, err = chunkStmt.ExecContext( + c, tableName, engineID, + value.Key.Path, value.Key.Offset, value.Columns, value.ShouldIncludeRowID, + value.Chunk.Offset, value.Chunk.EndOffset, value.Chunk.PrevRowIDMax, value.Chunk.RowIDMax, + value.Checksum.SumSize(), value.Checksum.SumKVS(), value.Checksum.Sum(), + ) + if err != nil { + return errors.Trace(err) + } + } } return nil @@ -458,14 +548,17 @@ func (cpdb *MySQLCheckpointsDB) InsertChunkCheckpoints(ctx context.Context, tabl func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff) { chunkQuery := fmt.Sprintf(` UPDATE %s.%s SET pos = ?, prev_rowid_max = ?, kvc_bytes = ?, kvc_kvs = ?, kvc_checksum = ? - WHERE table_name = ? AND path = ? AND offset = ?; + WHERE (table_name, engine_id, path, offset) = (?, ?, ?, ?); `, cpdb.schema, checkpointTableNameChunk) checksumQuery := fmt.Sprintf(` UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?; `, cpdb.schema, checkpointTableNameTable) - statusQuery := fmt.Sprintf(` + tableStatusQuery := fmt.Sprintf(` UPDATE %s.%s SET status = ? WHERE table_name = ?; `, cpdb.schema, checkpointTableNameTable) + engineStatusQuery := fmt.Sprintf(` + UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?); + `, cpdb.schema, checkpointTableNameEngine) err := common.TransactWithRetry(context.Background(), cpdb.db, "(update checkpoints)", func(c context.Context, tx *sql.Tx) error { chunkStmt, e := tx.PrepareContext(c, chunkQuery) @@ -478,15 +571,20 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi return errors.Trace(e) } defer checksumStmt.Close() - statusStmt, e := tx.PrepareContext(c, statusQuery) + tableStatusStmt, e := tx.PrepareContext(c, tableStatusQuery) + if e != nil { + return errors.Trace(e) + } + defer tableStatusStmt.Close() + engineStatusStmt, e := tx.PrepareContext(c, engineStatusQuery) if e != nil { return errors.Trace(e) } - defer statusStmt.Close() + defer engineStatusStmt.Close() for tableName, cpd := range checkpointDiffs { if cpd.hasStatus { - if _, e := statusStmt.ExecContext(c, cpd.status, tableName); e != nil { + if _, e := tableStatusStmt.ExecContext(c, cpd.status, tableName); e != nil { return errors.Trace(e) } } @@ -495,13 +593,20 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi return errors.Trace(e) } } - for key, diff := range cpd.chunks { - if _, e := chunkStmt.ExecContext( - c, - diff.pos, diff.rowID, diff.checksum.SumSize(), diff.checksum.SumKVS(), diff.checksum.Sum(), - tableName, key.Path, key.Offset, - ); e != nil { - return errors.Trace(e) + for engineID, engineDiff := range cpd.engines { + if engineDiff.hasStatus { + if _, e := engineStatusStmt.ExecContext(c, engineDiff.status, tableName, engineID); e != nil { + return errors.Trace(e) + } + } + for key, diff := range engineDiff.chunks { + if _, e := chunkStmt.ExecContext( + c, + diff.pos, diff.rowID, diff.checksum.SumSize(), diff.checksum.SumKVS(), diff.checksum.Sum(), + tableName, engineID, key.Path, key.Offset, + ); e != nil { + return errors.Trace(e) + } } } } @@ -556,7 +661,6 @@ func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, dbInfo map[string if _, ok := cpdb.checkpoints.Checkpoints[tableName]; !ok { cpdb.checkpoints.Checkpoints[tableName] = &TableCheckpointModel{ Status: uint32(CheckpointStatusLoaded), - Engine: uuid.NewV4().Bytes(), } } // TODO check if hash matches @@ -579,70 +683,80 @@ func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableC tableModel := cpdb.checkpoints.Checkpoints[tableName] - engine, err := uuid.FromBytes(tableModel.Engine) - if err != nil { - return nil, errors.Trace(err) - } - cp := &TableCheckpoint{ Status: CheckpointStatus(tableModel.Status), - Engine: engine, AllocBase: tableModel.AllocBase, - Chunks: make([]*ChunkCheckpoint, 0, len(tableModel.Chunks)), + Engines: make([]*EngineCheckpoint, 0, len(tableModel.Engines)), } - for _, chunkModel := range tableModel.Chunks { - cp.Chunks = append(cp.Chunks, &ChunkCheckpoint{ - Key: ChunkCheckpointKey{ - Path: chunkModel.Path, - Offset: chunkModel.Offset, - }, - Columns: chunkModel.Columns, - ShouldIncludeRowID: chunkModel.ShouldIncludeRowId, - Chunk: mydump.Chunk{ - Offset: chunkModel.Pos, - EndOffset: chunkModel.EndOffset, - PrevRowIDMax: chunkModel.PrevRowidMax, - RowIDMax: chunkModel.RowidMax, - }, - Checksum: verify.MakeKVChecksum(chunkModel.KvcBytes, chunkModel.KvcKvs, chunkModel.KvcChecksum), + for _, engineModel := range tableModel.Engines { + engine := &EngineCheckpoint{ + Status: CheckpointStatus(engineModel.Status), + Chunks: make([]*ChunkCheckpoint, 0, len(engineModel.Chunks)), + } + + for _, chunkModel := range engineModel.Chunks { + engine.Chunks = append(engine.Chunks, &ChunkCheckpoint{ + Key: ChunkCheckpointKey{ + Path: chunkModel.Path, + Offset: chunkModel.Offset, + }, + Columns: chunkModel.Columns, + ShouldIncludeRowID: chunkModel.ShouldIncludeRowId, + Chunk: mydump.Chunk{ + Offset: chunkModel.Pos, + EndOffset: chunkModel.EndOffset, + PrevRowIDMax: chunkModel.PrevRowidMax, + RowIDMax: chunkModel.RowidMax, + }, + Checksum: verify.MakeKVChecksum(chunkModel.KvcBytes, chunkModel.KvcKvs, chunkModel.KvcChecksum), + }) + } + + sort.Slice(engine.Chunks, func(i, j int) bool { + return engine.Chunks[i].Key.less(&engine.Chunks[j].Key) }) + + cp.Engines = append(cp.Engines, engine) } - sort.Slice(cp.Chunks, func(i, j int) bool { - return cp.Chunks[i].Key.less(&cp.Chunks[j].Key) - }) return cp, nil } -func (cpdb *FileCheckpointsDB) InsertChunkCheckpoints(_ context.Context, tableName string, checkpoints []*ChunkCheckpoint) error { +func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints []*EngineCheckpoint) error { cpdb.lock.Lock() defer cpdb.lock.Unlock() tableModel := cpdb.checkpoints.Checkpoints[tableName] - if tableModel.Chunks == nil { - tableModel.Chunks = make(map[string]*ChunkCheckpointModel) - } - - for _, value := range checkpoints { - key := value.Key.String() - chunk, ok := tableModel.Chunks[key] - if !ok { - chunk = &ChunkCheckpointModel{ - Path: value.Key.Path, - Offset: value.Key.Offset, - Columns: value.Columns, - ShouldIncludeRowId: value.ShouldIncludeRowID, + for len(tableModel.Engines) < len(checkpoints) { + tableModel.Engines = append(tableModel.Engines, &EngineCheckpointModel{ + Status: uint32(CheckpointStatusLoaded), + Chunks: make(map[string]*ChunkCheckpointModel), + }) + } + + for engineID, engine := range checkpoints { + engineModel := tableModel.Engines[engineID] + for _, value := range engine.Chunks { + key := value.Key.String() + chunk, ok := engineModel.Chunks[key] + if !ok { + chunk = &ChunkCheckpointModel{ + Path: value.Key.Path, + Offset: value.Key.Offset, + Columns: value.Columns, + ShouldIncludeRowId: value.ShouldIncludeRowID, + } + engineModel.Chunks[key] = chunk } - tableModel.Chunks[key] = chunk + chunk.Pos = value.Chunk.Offset + chunk.EndOffset = value.Chunk.EndOffset + chunk.PrevRowidMax = value.Chunk.PrevRowIDMax + chunk.RowidMax = value.Chunk.RowIDMax + chunk.KvcBytes = value.Checksum.SumSize() + chunk.KvcKvs = value.Checksum.SumKVS() + chunk.KvcChecksum = value.Checksum.Sum() } - chunk.Pos = value.Chunk.Offset - chunk.EndOffset = value.Chunk.EndOffset - chunk.PrevRowidMax = value.Chunk.PrevRowIDMax - chunk.RowidMax = value.Chunk.RowIDMax - chunk.KvcBytes = value.Checksum.SumSize() - chunk.KvcKvs = value.Checksum.SumKVS() - chunk.KvcChecksum = value.Checksum.Sum() } return errors.Trace(cpdb.save()) @@ -660,13 +774,20 @@ func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoin if cpd.hasRebase { tableModel.AllocBase = cpd.allocBase } - for key, diff := range cpd.chunks { - chunkModel := tableModel.Chunks[key.String()] - chunkModel.Pos = diff.pos - chunkModel.PrevRowidMax = diff.rowID - chunkModel.KvcBytes = diff.checksum.SumSize() - chunkModel.KvcKvs = diff.checksum.SumKVS() - chunkModel.KvcChecksum = diff.checksum.Sum() + for engineID, engineDiff := range cpd.engines { + engineModel := tableModel.Engines[engineID] + if engineDiff.hasStatus { + engineModel.Status = uint32(engineDiff.status) + } + + for key, diff := range engineDiff.chunks { + chunkModel := engineModel.Chunks[key.String()] + chunkModel.Pos = diff.pos + chunkModel.PrevRowidMax = diff.rowID + chunkModel.KvcBytes = diff.checksum.SumSize() + chunkModel.KvcKvs = diff.checksum.SumKVS() + chunkModel.KvcChecksum = diff.checksum.Sum() + } } } @@ -691,33 +812,43 @@ func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]Des func (*NullCheckpointsDB) DumpTables(context.Context, io.Writer) error { return errors.Trace(cannotManageNullDB) } +func (*NullCheckpointsDB) DumpEngines(context.Context, io.Writer) error { + return errors.Trace(cannotManageNullDB) +} func (*NullCheckpointsDB) DumpChunks(context.Context, io.Writer) error { return errors.Trace(cannotManageNullDB) } func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error { var ( - deleteChunkFmt string - deleteTableFmt string - arg interface{} + deleteChunkFmt string + deleteEngineFmt string + deleteTableFmt string + arg interface{} ) if tableName == "all" { deleteChunkFmt = "DELETE FROM %[1]s.%[2]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[3]s WHERE node_id = ?)" + deleteEngineFmt = "DELETE FROM %[1]s.%[2]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[3]s WHERE node_id = ?)" deleteTableFmt = "DELETE FROM %s.%s WHERE node_id = ?" arg = nodeID } else { deleteChunkFmt = "DELETE FROM %s.%s WHERE table_name = ?%.0s" // the %.0s is to consume the third parameter. + deleteEngineFmt = "DELETE FROM %s.%s WHERE table_name = ?%.0s" deleteTableFmt = "DELETE FROM %s.%s WHERE table_name = ?" arg = tableName } deleteChunkQuery := fmt.Sprintf(deleteChunkFmt, cpdb.schema, checkpointTableNameChunk, checkpointTableNameTable) + deleteEngineQuery := fmt.Sprintf(deleteEngineFmt, cpdb.schema, checkpointTableNameEngine, checkpointTableNameTable) deleteTableQuery := fmt.Sprintf(deleteTableFmt, cpdb.schema, checkpointTableNameTable) err := common.TransactWithRetry(ctx, cpdb.db, fmt.Sprintf("(remove checkpoints of %s)", tableName), func(c context.Context, tx *sql.Tx) error { if _, e := tx.ExecContext(c, deleteChunkQuery, arg); e != nil { return errors.Trace(e) } + if _, e := tx.ExecContext(c, deleteEngineQuery, arg); e != nil { + return errors.Trace(e) + } if _, e := tx.ExecContext(c, deleteTableQuery, arg); e != nil { return errors.Trace(e) } @@ -736,11 +867,22 @@ func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, table } else { colName, arg = "table_name", tableName } - query := fmt.Sprintf(` + engineQuery := fmt.Sprintf(` + UPDATE %s.%s SET status = %d WHERE %s = ? AND status <= %d; + `, cpdb.schema, checkpointTableNameEngine, CheckpointStatusLoaded, colName, CheckpointStatusMaxInvalid) + tableQuery := fmt.Sprintf(` UPDATE %s.%s SET status = %d WHERE %s = ? AND status <= %d; `, cpdb.schema, checkpointTableNameTable, CheckpointStatusLoaded, colName, CheckpointStatusMaxInvalid) - err := common.ExecWithRetry(ctx, cpdb.db, fmt.Sprintf("(ignore error checkpoints for %s)", tableName), query, arg) + err := common.TransactWithRetry(ctx, cpdb.db, fmt.Sprintf("(ignore error checkpoints for %s)", tableName), func(c context.Context, tx *sql.Tx) error { + if _, e := tx.ExecContext(c, engineQuery, arg); e != nil { + return errors.Trace(e) + } + if _, e := tx.ExecContext(c, tableQuery, arg); e != nil { + return errors.Trace(e) + } + return nil + }) return errors.Trace(err) } @@ -759,11 +901,20 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl } selectQuery := fmt.Sprintf(` - SELECT table_name, engine FROM %s.%s WHERE %s = ? AND status <= %d; - `, cpdb.schema, checkpointTableNameTable, conditionColumn, CheckpointStatusMaxInvalid) + SELECT + t.table_name, + COALESCE(MAX(e.engine_id) + 1, 0) + FROM %[1]s.%[4]s t + LEFT JOIN %[1]s.%[5]s e ON t.table_name = e.table_name + WHERE t.%[2]s = ? AND t.status <= %[3]d + GROUP BY t.table_name; + `, cpdb.schema, conditionColumn, CheckpointStatusMaxInvalid, checkpointTableNameTable, checkpointTableNameEngine) deleteChunkQuery := fmt.Sprintf(` DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d) `, cpdb.schema, conditionColumn, CheckpointStatusMaxInvalid, checkpointTableNameChunk, checkpointTableNameTable) + deleteEngineQuery := fmt.Sprintf(` + DELETE FROM %[1]s.%[4]s WHERE table_name IN (SELECT table_name FROM %[1]s.%[5]s WHERE %[2]s = ? AND status <= %[3]d) + `, cpdb.schema, conditionColumn, CheckpointStatusMaxInvalid, checkpointTableNameEngine, checkpointTableNameTable) deleteTableQuery := fmt.Sprintf(` DELETE FROM %s.%s WHERE %s = ? AND status <= %d `, cpdb.schema, checkpointTableNameTable, conditionColumn, CheckpointStatusMaxInvalid) @@ -779,17 +930,11 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl } defer rows.Close() for rows.Next() { - var ( - matchedTableName string - matchedEngine []byte - ) - if e := rows.Scan(&matchedTableName, &matchedEngine); e != nil { + var dtc DestroyedTableCheckpoint + if e := rows.Scan(&dtc.TableName, &dtc.EnginesCount); e != nil { return errors.Trace(e) } - targetTables = append(targetTables, DestroyedTableCheckpoint{ - TableName: matchedTableName, - Engine: uuid.FromBytesOrNil(matchedEngine), - }) + targetTables = append(targetTables, dtc) } if e := rows.Err(); e != nil { return errors.Trace(e) @@ -799,6 +944,9 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl if _, e := tx.ExecContext(c, deleteChunkQuery, arg); e != nil { return errors.Trace(e) } + if _, e := tx.ExecContext(c, deleteEngineQuery, arg); e != nil { + return errors.Trace(e) + } if _, e := tx.ExecContext(c, deleteTableQuery, arg); e != nil { return errors.Trace(e) } @@ -818,7 +966,6 @@ func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer session, table_name, hex(hash) AS hash, - hex(engine) AS engine, status, alloc_base, create_time, @@ -833,6 +980,24 @@ func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer return errors.Trace(sqltocsv.Write(writer, rows)) } +func (cpdb *MySQLCheckpointsDB) DumpEngines(ctx context.Context, writer io.Writer) error { + rows, err := cpdb.db.QueryContext(ctx, fmt.Sprintf(` + SELECT + table_name, + engine_id, + status, + create_time, + update_time + FROM %s.%s; + `, cpdb.schema, checkpointTableNameEngine)) + if err != nil { + return errors.Trace(err) + } + defer rows.Close() + + return errors.Trace(sqltocsv.Write(writer, rows)) +} + func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer) error { rows, err := cpdb.db.QueryContext(ctx, fmt.Sprintf(` SELECT @@ -882,6 +1047,11 @@ func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTa if tableModel.Status <= uint32(CheckpointStatusMaxInvalid) { tableModel.Status = uint32(CheckpointStatusLoaded) } + for _, engineModel := range tableModel.Engines { + if engineModel.Status <= uint32(CheckpointStatusMaxInvalid) { + engineModel.Status = uint32(CheckpointStatusLoaded) + } + } } return errors.Trace(cpdb.save()) } @@ -899,8 +1069,8 @@ func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetT } if tableModel.Status <= uint32(CheckpointStatusMaxInvalid) { targetTables = append(targetTables, DestroyedTableCheckpoint{ - TableName: tableName, - Engine: uuid.FromBytesOrNil(tableModel.Engine), + TableName: tableName, + EnginesCount: len(tableModel.Engines), }) } } @@ -920,6 +1090,10 @@ func (cpdb *FileCheckpointsDB) DumpTables(context.Context, io.Writer) error { return errors.Errorf("dumping file checkpoint into CSV not unsupported, you may copy %s instead", cpdb.path) } +func (cpdb *FileCheckpointsDB) DumpEngines(context.Context, io.Writer) error { + return errors.Errorf("dumping file checkpoint into CSV not unsupported, you may copy %s instead", cpdb.path) +} + func (cpdb *FileCheckpointsDB) DumpChunks(context.Context, io.Writer) error { return errors.Errorf("dumping file checkpoint into CSV not unsupported, you may copy %s instead", cpdb.path) } diff --git a/lightning/restore/file_checkpoints.pb.go b/lightning/restore/file_checkpoints.pb.go index 579f41148..49e39828f 100644 --- a/lightning/restore/file_checkpoints.pb.go +++ b/lightning/restore/file_checkpoints.pb.go @@ -33,7 +33,7 @@ func (m *CheckpointsModel) Reset() { *m = CheckpointsModel{} } func (m *CheckpointsModel) String() string { return proto.CompactTextString(m) } func (*CheckpointsModel) ProtoMessage() {} func (*CheckpointsModel) Descriptor() ([]byte, []int) { - return fileDescriptor_file_checkpoints_e8be6a4c2b3dc1c8, []int{0} + return fileDescriptor_file_checkpoints_e3dc65d7af07b7eb, []int{0} } func (m *CheckpointsModel) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -70,21 +70,19 @@ func (m *CheckpointsModel) GetCheckpoints() map[string]*TableCheckpointModel { } type TableCheckpointModel struct { - Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` - Engine []byte `protobuf:"bytes,2,opt,name=engine,proto3" json:"engine,omitempty"` - Status uint32 `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"` - AllocBase int64 `protobuf:"varint,4,opt,name=alloc_base,json=allocBase,proto3" json:"alloc_base,omitempty"` - // key is "$path:$offset" - Chunks map[string]*ChunkCheckpointModel `protobuf:"bytes,5,rep,name=chunks" json:"chunks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_sizecache int32 `json:"-"` + Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` + Status uint32 `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"` + AllocBase int64 `protobuf:"varint,4,opt,name=alloc_base,json=allocBase,proto3" json:"alloc_base,omitempty"` + Engines []*EngineCheckpointModel `protobuf:"bytes,6,rep,name=engines" json:"engines,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *TableCheckpointModel) Reset() { *m = TableCheckpointModel{} } func (m *TableCheckpointModel) String() string { return proto.CompactTextString(m) } func (*TableCheckpointModel) ProtoMessage() {} func (*TableCheckpointModel) Descriptor() ([]byte, []int) { - return fileDescriptor_file_checkpoints_e8be6a4c2b3dc1c8, []int{1} + return fileDescriptor_file_checkpoints_e3dc65d7af07b7eb, []int{1} } func (m *TableCheckpointModel) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -120,13 +118,6 @@ func (m *TableCheckpointModel) GetHash() []byte { return nil } -func (m *TableCheckpointModel) GetEngine() []byte { - if m != nil { - return m.Engine - } - return nil -} - func (m *TableCheckpointModel) GetStatus() uint32 { if m != nil { return m.Status @@ -141,7 +132,62 @@ func (m *TableCheckpointModel) GetAllocBase() int64 { return 0 } -func (m *TableCheckpointModel) GetChunks() map[string]*ChunkCheckpointModel { +func (m *TableCheckpointModel) GetEngines() []*EngineCheckpointModel { + if m != nil { + return m.Engines + } + return nil +} + +type EngineCheckpointModel struct { + Status uint32 `protobuf:"varint,1,opt,name=status,proto3" json:"status,omitempty"` + // key is "$path:$offset" + Chunks map[string]*ChunkCheckpointModel `protobuf:"bytes,2,rep,name=chunks" json:"chunks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *EngineCheckpointModel) Reset() { *m = EngineCheckpointModel{} } +func (m *EngineCheckpointModel) String() string { return proto.CompactTextString(m) } +func (*EngineCheckpointModel) ProtoMessage() {} +func (*EngineCheckpointModel) Descriptor() ([]byte, []int) { + return fileDescriptor_file_checkpoints_e3dc65d7af07b7eb, []int{2} +} +func (m *EngineCheckpointModel) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *EngineCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_EngineCheckpointModel.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *EngineCheckpointModel) XXX_Merge(src proto.Message) { + xxx_messageInfo_EngineCheckpointModel.Merge(dst, src) +} +func (m *EngineCheckpointModel) XXX_Size() int { + return m.Size() +} +func (m *EngineCheckpointModel) XXX_DiscardUnknown() { + xxx_messageInfo_EngineCheckpointModel.DiscardUnknown(m) +} + +var xxx_messageInfo_EngineCheckpointModel proto.InternalMessageInfo + +func (m *EngineCheckpointModel) GetStatus() uint32 { + if m != nil { + return m.Status + } + return 0 +} + +func (m *EngineCheckpointModel) GetChunks() map[string]*ChunkCheckpointModel { if m != nil { return m.Chunks } @@ -168,7 +214,7 @@ func (m *ChunkCheckpointModel) Reset() { *m = ChunkCheckpointModel{} } func (m *ChunkCheckpointModel) String() string { return proto.CompactTextString(m) } func (*ChunkCheckpointModel) ProtoMessage() {} func (*ChunkCheckpointModel) Descriptor() ([]byte, []int) { - return fileDescriptor_file_checkpoints_e8be6a4c2b3dc1c8, []int{2} + return fileDescriptor_file_checkpoints_e3dc65d7af07b7eb, []int{3} } func (m *ChunkCheckpointModel) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -278,7 +324,8 @@ func init() { proto.RegisterType((*CheckpointsModel)(nil), "CheckpointsModel") proto.RegisterMapType((map[string]*TableCheckpointModel)(nil), "CheckpointsModel.CheckpointsEntry") proto.RegisterType((*TableCheckpointModel)(nil), "TableCheckpointModel") - proto.RegisterMapType((map[string]*ChunkCheckpointModel)(nil), "TableCheckpointModel.ChunksEntry") + proto.RegisterType((*EngineCheckpointModel)(nil), "EngineCheckpointModel") + proto.RegisterMapType((map[string]*ChunkCheckpointModel)(nil), "EngineCheckpointModel.ChunksEntry") proto.RegisterType((*ChunkCheckpointModel)(nil), "ChunkCheckpointModel") } func (m *CheckpointsModel) Marshal() (dAtA []byte, err error) { @@ -348,12 +395,6 @@ func (m *TableCheckpointModel) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintFileCheckpoints(dAtA, i, uint64(len(m.Hash))) i += copy(dAtA[i:], m.Hash) } - if len(m.Engine) > 0 { - dAtA[i] = 0x12 - i++ - i = encodeVarintFileCheckpoints(dAtA, i, uint64(len(m.Engine))) - i += copy(dAtA[i:], m.Engine) - } if m.Status != 0 { dAtA[i] = 0x18 i++ @@ -364,9 +405,44 @@ func (m *TableCheckpointModel) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.AllocBase)) } + if len(m.Engines) > 0 { + for _, msg := range m.Engines { + dAtA[i] = 0x32 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *EngineCheckpointModel) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *EngineCheckpointModel) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Status != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.Status)) + } if len(m.Chunks) > 0 { for k, _ := range m.Chunks { - dAtA[i] = 0x2a + dAtA[i] = 0x12 i++ v := m.Chunks[k] msgSize := 0 @@ -511,16 +587,27 @@ func (m *TableCheckpointModel) Size() (n int) { if l > 0 { n += 1 + l + sovFileCheckpoints(uint64(l)) } - l = len(m.Engine) - if l > 0 { - n += 1 + l + sovFileCheckpoints(uint64(l)) - } if m.Status != 0 { n += 1 + sovFileCheckpoints(uint64(m.Status)) } if m.AllocBase != 0 { n += 1 + sovFileCheckpoints(uint64(m.AllocBase)) } + if len(m.Engines) > 0 { + for _, e := range m.Engines { + l = e.Size() + n += 1 + l + sovFileCheckpoints(uint64(l)) + } + } + return n +} + +func (m *EngineCheckpointModel) Size() (n int) { + var l int + _ = l + if m.Status != 0 { + n += 1 + sovFileCheckpoints(uint64(m.Status)) + } if len(m.Chunks) > 0 { for k, v := range m.Chunks { _ = k @@ -824,11 +911,11 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { m.Hash = []byte{} } iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Engine", wireType) + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) } - var byteLen int + m.Status = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowFileCheckpoints @@ -838,28 +925,35 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - byteLen |= (int(b) & 0x7F) << shift + m.Status |= (uint32(b) & 0x7F) << shift if b < 0x80 { break } } - if byteLen < 0 { - return ErrInvalidLengthFileCheckpoints - } - postIndex := iNdEx + byteLen - if postIndex > l { - return io.ErrUnexpectedEOF + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field AllocBase", wireType) } - m.Engine = append(m.Engine[:0], dAtA[iNdEx:postIndex]...) - if m.Engine == nil { - m.Engine = []byte{} + m.AllocBase = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.AllocBase |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } } - iNdEx = postIndex - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Engines", wireType) } - m.Status = 0 + var msglen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowFileCheckpoints @@ -869,16 +963,78 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Status |= (uint32(b) & 0x7F) << shift + msglen |= (int(b) & 0x7F) << shift if b < 0x80 { break } } - case 4: + if msglen < 0 { + return ErrInvalidLengthFileCheckpoints + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Engines = append(m.Engines, &EngineCheckpointModel{}) + if err := m.Engines[len(m.Engines)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipFileCheckpoints(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthFileCheckpoints + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: EngineCheckpointModel: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: EngineCheckpointModel: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field AllocBase", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) } - m.AllocBase = 0 + m.Status = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowFileCheckpoints @@ -888,12 +1044,12 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.AllocBase |= (int64(b) & 0x7F) << shift + m.Status |= (uint32(b) & 0x7F) << shift if b < 0x80 { break } } - case 5: + case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) } @@ -1416,41 +1572,42 @@ var ( ) func init() { - proto.RegisterFile("lightning/restore/file_checkpoints.proto", fileDescriptor_file_checkpoints_e8be6a4c2b3dc1c8) + proto.RegisterFile("lightning/restore/file_checkpoints.proto", fileDescriptor_file_checkpoints_e3dc65d7af07b7eb) } -var fileDescriptor_file_checkpoints_e8be6a4c2b3dc1c8 = []byte{ - // 499 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcd, 0x8e, 0xd3, 0x3c, - 0x14, 0xfd, 0xdc, 0xb4, 0x69, 0x7b, 0xd3, 0x0f, 0x55, 0xd6, 0x0c, 0x98, 0x41, 0x54, 0x99, 0x8a, - 0x45, 0x24, 0xa4, 0x8e, 0x18, 0x36, 0xc0, 0xb2, 0x85, 0xc5, 0x08, 0x8d, 0x40, 0x16, 0x6c, 0xd8, - 0x44, 0x69, 0xe2, 0x36, 0x51, 0x5c, 0xbb, 0x8a, 0x9d, 0xcc, 0xf4, 0x05, 0x58, 0xf3, 0x26, 0xbc, - 0x06, 0x4b, 0x1e, 0x80, 0x05, 0x2a, 0x2f, 0x82, 0xec, 0x04, 0x35, 0xa0, 0x0a, 0xb1, 0xbb, 0xe7, - 0xe7, 0x9e, 0xdb, 0x1e, 0xc5, 0x10, 0xf0, 0x6c, 0x9d, 0x6a, 0x91, 0x89, 0xf5, 0x45, 0xc1, 0x94, - 0x96, 0x05, 0xbb, 0x58, 0x65, 0x9c, 0x85, 0x71, 0xca, 0xe2, 0x7c, 0x2b, 0x33, 0xa1, 0xd5, 0x6c, - 0x5b, 0x48, 0x2d, 0xa7, 0x9f, 0x11, 0x8c, 0x17, 0x07, 0xf6, 0x5a, 0x26, 0x8c, 0xe3, 0x97, 0xe0, - 0xb5, 0x9c, 0x04, 0xf9, 0x4e, 0xe0, 0x5d, 0x4e, 0x67, 0x7f, 0xfa, 0xda, 0xc4, 0x2b, 0xa1, 0x8b, - 0x1d, 0x6d, 0xaf, 0x9d, 0xbd, 0xff, 0x2d, 0xd9, 0x1a, 0xf0, 0x18, 0x9c, 0x9c, 0xed, 0x08, 0xf2, - 0x51, 0x30, 0xa4, 0x66, 0xc4, 0x8f, 0xa1, 0x57, 0x45, 0xbc, 0x64, 0xa4, 0xe3, 0xa3, 0xc0, 0xbb, - 0x3c, 0x9d, 0xbd, 0x8b, 0x96, 0x9c, 0x1d, 0x16, 0xed, 0x25, 0x5a, 0x7b, 0x5e, 0x74, 0x9e, 0xa1, - 0xe9, 0xc7, 0x0e, 0x9c, 0x1c, 0xf3, 0x60, 0x0c, 0xdd, 0x34, 0x52, 0xa9, 0x0d, 0x1f, 0x51, 0x3b, - 0xe3, 0xbb, 0xe0, 0x32, 0xb1, 0xce, 0x44, 0x1d, 0x3f, 0xa2, 0x0d, 0x32, 0xbc, 0xd2, 0x91, 0x2e, - 0x15, 0x71, 0x7c, 0x14, 0xfc, 0x4f, 0x1b, 0x84, 0x1f, 0x02, 0x44, 0x9c, 0xcb, 0x38, 0x5c, 0x46, - 0x8a, 0x91, 0xae, 0x8f, 0x02, 0x87, 0x0e, 0x2d, 0x33, 0x8f, 0x14, 0xc3, 0xcf, 0xc1, 0x8d, 0xd3, - 0x52, 0xe4, 0x8a, 0xf4, 0x6c, 0x27, 0xe7, 0x47, 0x7f, 0xed, 0x6c, 0x61, 0x3d, 0x75, 0x25, 0xcd, - 0xc2, 0xd9, 0x5b, 0xf0, 0x5a, 0xf4, 0xbf, 0x14, 0x61, 0xed, 0x7f, 0x29, 0xe2, 0x5b, 0x07, 0x4e, - 0x8e, 0x79, 0x4c, 0x11, 0xdb, 0x48, 0xa7, 0x4d, 0xb8, 0x9d, 0xcd, 0x1f, 0x96, 0xab, 0x95, 0x62, - 0xda, 0xc6, 0x3b, 0xb4, 0x41, 0x98, 0x40, 0x3f, 0x96, 0xbc, 0xdc, 0x88, 0xba, 0x89, 0x11, 0xfd, - 0x05, 0xf1, 0x13, 0x38, 0x55, 0xa9, 0x2c, 0x79, 0x12, 0x66, 0x22, 0xe6, 0x65, 0xc2, 0xc2, 0x42, - 0xde, 0x84, 0x59, 0x62, 0x5b, 0x19, 0x50, 0x5c, 0x8b, 0x57, 0xb5, 0x46, 0xe5, 0xcd, 0x55, 0x62, - 0xda, 0x63, 0x22, 0x09, 0x9b, 0x43, 0xbd, 0xba, 0x3d, 0x26, 0x92, 0x37, 0xf5, 0xad, 0x31, 0x38, - 0x5b, 0xa9, 0x88, 0x6b, 0x79, 0x33, 0xe2, 0x47, 0x70, 0x67, 0x5b, 0xb0, 0xca, 0x24, 0x67, 0x49, - 0xb8, 0x89, 0x6e, 0x49, 0xdf, 0x8a, 0x23, 0xc3, 0x52, 0x43, 0x5e, 0x47, 0xb7, 0xf8, 0x01, 0x0c, - 0x0f, 0x86, 0x81, 0x35, 0x0c, 0x8a, 0x96, 0x98, 0x57, 0x71, 0xb8, 0xdc, 0x69, 0xa6, 0xc8, 0xd0, - 0x47, 0x41, 0x97, 0x0e, 0xf2, 0x2a, 0x9e, 0x1b, 0x8c, 0xef, 0x41, 0xdf, 0x88, 0x79, 0xa5, 0x08, - 0x58, 0xc9, 0xcd, 0xab, 0xf8, 0x75, 0xa5, 0xf0, 0x39, 0x8c, 0x8c, 0x60, 0x3f, 0x57, 0x55, 0x6e, - 0x88, 0xe7, 0xa3, 0xc0, 0xa5, 0x5e, 0x5e, 0xc5, 0x8b, 0x86, 0x9a, 0xdf, 0xff, 0xb2, 0x9f, 0xa0, - 0xaf, 0xfb, 0x09, 0xfa, 0xbe, 0x9f, 0xa0, 0x4f, 0x3f, 0x26, 0xff, 0x7d, 0xe8, 0x37, 0x6f, 0x69, - 0xe9, 0xda, 0xb7, 0xf3, 0xf4, 0x67, 0x00, 0x00, 0x00, 0xff, 0xff, 0x11, 0x26, 0x76, 0xf9, 0x67, - 0x03, 0x00, 0x00, +var fileDescriptor_file_checkpoints_e3dc65d7af07b7eb = []byte{ + // 520 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0x4d, 0x6e, 0xd3, 0x40, + 0x14, 0x66, 0xe2, 0xd4, 0x49, 0x9e, 0x03, 0x8a, 0x46, 0x4d, 0x19, 0x8a, 0x88, 0x4c, 0xc4, 0xc2, + 0x12, 0x52, 0x0a, 0x65, 0x83, 0xba, 0x4c, 0xe8, 0xa2, 0x42, 0x15, 0x68, 0x04, 0x1b, 0x36, 0x96, + 0x63, 0x4f, 0x62, 0xcb, 0xce, 0x4c, 0xe4, 0x19, 0xbb, 0xcd, 0x2d, 0x90, 0x38, 0x08, 0x27, 0x60, + 0xcf, 0x92, 0x03, 0xb0, 0x40, 0xe1, 0x22, 0x68, 0xc6, 0xae, 0xe2, 0x56, 0x11, 0xea, 0xee, 0xbd, + 0xef, 0xfb, 0xde, 0x8f, 0xbf, 0xe7, 0x01, 0x2f, 0x4b, 0x96, 0xb1, 0xe2, 0x09, 0x5f, 0x9e, 0xe4, + 0x4c, 0x2a, 0x91, 0xb3, 0x93, 0x45, 0x92, 0x31, 0x3f, 0x8c, 0x59, 0x98, 0xae, 0x45, 0xc2, 0x95, + 0x9c, 0xac, 0x73, 0xa1, 0xc4, 0xf8, 0x3b, 0x82, 0xc1, 0x6c, 0x87, 0x5e, 0x8a, 0x88, 0x65, 0xf8, + 0x1d, 0x38, 0x0d, 0x25, 0x41, 0xae, 0xe5, 0x39, 0xa7, 0xe3, 0xc9, 0x5d, 0x5d, 0x13, 0x38, 0xe7, + 0x2a, 0xdf, 0xd0, 0x66, 0xd9, 0xf1, 0xe7, 0x5b, 0x9d, 0x8d, 0x00, 0x0f, 0xc0, 0x4a, 0xd9, 0x86, + 0x20, 0x17, 0x79, 0x3d, 0xaa, 0x43, 0xfc, 0x12, 0x0e, 0xca, 0x20, 0x2b, 0x18, 0x69, 0xb9, 0xc8, + 0x73, 0x4e, 0x87, 0x93, 0x4f, 0xc1, 0x3c, 0x63, 0xbb, 0x42, 0x33, 0x89, 0x56, 0x9a, 0xb3, 0xd6, + 0x5b, 0x34, 0xfe, 0x86, 0xe0, 0x70, 0x9f, 0x06, 0x63, 0x68, 0xc7, 0x81, 0x8c, 0x4d, 0xf3, 0x3e, + 0x35, 0x31, 0x3e, 0x02, 0x5b, 0xaa, 0x40, 0x15, 0x92, 0x58, 0x2e, 0xf2, 0x1e, 0xd2, 0x3a, 0xc3, + 0xcf, 0x00, 0x82, 0x2c, 0x13, 0xa1, 0x3f, 0x0f, 0x24, 0x23, 0x6d, 0x17, 0x79, 0x16, 0xed, 0x19, + 0x64, 0x1a, 0x48, 0x86, 0x5f, 0x41, 0x87, 0xf1, 0x65, 0xc2, 0x99, 0x24, 0xb6, 0xf9, 0xf8, 0xa3, + 0xc9, 0xb9, 0xc9, 0xef, 0xee, 0x75, 0x23, 0x1b, 0xff, 0x40, 0x30, 0xdc, 0x2b, 0x69, 0xac, 0x80, + 0x6e, 0xad, 0x70, 0x06, 0x76, 0x18, 0x17, 0x3c, 0x95, 0xa4, 0x55, 0xfb, 0xbb, 0xb7, 0x7e, 0x32, + 0x33, 0xa2, 0xca, 0xdf, 0xba, 0xe2, 0xf8, 0x23, 0x38, 0x0d, 0xf8, 0x3e, 0xae, 0x1a, 0xf9, 0x7f, + 0x5c, 0xfd, 0xdd, 0x82, 0xc3, 0x7d, 0x1a, 0xed, 0xea, 0x3a, 0x50, 0x71, 0xdd, 0xdc, 0xc4, 0xfa, + 0x93, 0xc4, 0x62, 0x21, 0x99, 0x32, 0xed, 0x2d, 0x5a, 0x67, 0x98, 0x40, 0x27, 0x14, 0x59, 0xb1, + 0xe2, 0x95, 0xdd, 0x7d, 0x7a, 0x93, 0xe2, 0xd7, 0x30, 0x94, 0xb1, 0x28, 0xb2, 0xc8, 0x4f, 0x78, + 0x98, 0x15, 0x11, 0xf3, 0x73, 0x71, 0xe5, 0x27, 0x91, 0xb1, 0xbe, 0x4b, 0x71, 0x45, 0x5e, 0x54, + 0x1c, 0x15, 0x57, 0x17, 0x91, 0x3e, 0x11, 0xe3, 0x91, 0x5f, 0x0f, 0x3a, 0xa8, 0x4e, 0xc4, 0x78, + 0xf4, 0xa1, 0x9a, 0x35, 0x00, 0x6b, 0x2d, 0xf4, 0x79, 0x34, 0xae, 0x43, 0xfc, 0x02, 0x1e, 0xad, + 0x73, 0x56, 0xea, 0xce, 0x49, 0xe4, 0xaf, 0x82, 0x6b, 0xd2, 0x31, 0x64, 0x5f, 0xa3, 0x54, 0x83, + 0x97, 0xc1, 0x35, 0x7e, 0x0a, 0xbd, 0x9d, 0xa0, 0x6b, 0x04, 0xdd, 0xbc, 0x41, 0xa6, 0x65, 0xe8, + 0xcf, 0x37, 0x8a, 0x49, 0xd2, 0x73, 0x91, 0xd7, 0xa6, 0xdd, 0xb4, 0x0c, 0xa7, 0x3a, 0xc7, 0x8f, + 0xa1, 0xa3, 0xc9, 0xb4, 0x94, 0x04, 0x0c, 0x65, 0xa7, 0x65, 0xf8, 0xbe, 0x94, 0xf8, 0x39, 0xf4, + 0x35, 0x61, 0xfe, 0x7d, 0x59, 0xac, 0x88, 0xe3, 0x22, 0xcf, 0xa6, 0x4e, 0x5a, 0x86, 0xb3, 0x1a, + 0x9a, 0x3e, 0xf9, 0xb9, 0x1d, 0xa1, 0x5f, 0xdb, 0x11, 0xfa, 0xb3, 0x1d, 0xa1, 0xaf, 0x7f, 0x47, + 0x0f, 0xbe, 0x74, 0xea, 0x87, 0x39, 0xb7, 0xcd, 0x43, 0x7c, 0xf3, 0x2f, 0x00, 0x00, 0xff, 0xff, + 0x03, 0x70, 0xa5, 0x5f, 0xb4, 0x03, 0x00, 0x00, } diff --git a/lightning/restore/file_checkpoints.proto b/lightning/restore/file_checkpoints.proto index 9d1236080..00bdaa138 100644 --- a/lightning/restore/file_checkpoints.proto +++ b/lightning/restore/file_checkpoints.proto @@ -8,11 +8,15 @@ message CheckpointsModel { message TableCheckpointModel { bytes hash = 1; - bytes engine = 2; uint32 status = 3; int64 alloc_base = 4; + repeated EngineCheckpointModel engines = 6; +} + +message EngineCheckpointModel { + uint32 status = 1; // key is "$path:$offset" - map chunks = 5; + map chunks = 2; } message ChunkCheckpointModel { diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 363c76b0a..10493d4bf 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -260,23 +260,17 @@ func (rc *RestoreController) restoreSchema(ctx context.Context) error { } func (rc *RestoreController) estimateChunkCountIntoMetrics() { - estimatedChunkCount := int64(0) - minRegionSize := rc.cfg.Mydumper.MinRegionSize + estimatedChunkCount := 0 for _, dbMeta := range rc.dbMetas { for _, tableMeta := range dbMeta.Tables { - for _, dataFile := range tableMeta.DataFiles { - info, err := os.Stat(dataFile) - if err == nil { - estimatedChunkCount += (info.Size() + minRegionSize - 1) / minRegionSize - } - } + estimatedChunkCount += len(tableMeta.DataFiles) } } metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated).Add(float64(estimatedChunkCount)) } -func (rc *RestoreController) saveStatusCheckpoint(tableName string, err error, statusIfSucceed CheckpointStatus) { - merger := &StatusCheckpointMerger{Status: statusIfSucceed} +func (rc *RestoreController) saveStatusCheckpoint(tableName string, engineID int, err error, statusIfSucceed CheckpointStatus) { + merger := &StatusCheckpointMerger{Status: statusIfSucceed, EngineID: engineID} switch { case err == nil: @@ -337,7 +331,7 @@ func (rc *RestoreController) listenCheckpointUpdates(wg *sync.WaitGroup) { // continue // gofail: var FailIfStatusBecomes int - // if merger, ok := scp.merger.(*StatusCheckpointMerger); ok && int(merger.Status) == FailIfStatusBecomes { + // if merger, ok := scp.merger.(*StatusCheckpointMerger); ok && merger.EngineID >= 0 && int(merger.Status) == FailIfStatusBecomes { // wg.Wait() // panic("forcing failure due to FailIfStatusBecomes") // } @@ -403,10 +397,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { timer := time.Now() var wg sync.WaitGroup - var ( - restoreErrLock sync.Mutex - restoreErr error - ) + var restoreErr common.OnceError stopPeriodicActions := make(chan struct{}, 1) go rc.runPeriodicActions(ctx, stopPeriodicActions) @@ -431,6 +422,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { tableName := common.UniqueTable(dbInfo.Name, tableInfo.Name) cp, err := rc.checkpointsDB.Get(ctx, tableName) + if cp.Status <= CheckpointStatusMaxInvalid { + return errors.Errorf("Checkpoint for %s has invalid status: %d", tableName, cp.Status) + } if err != nil { return errors.Trace(err) } @@ -439,36 +433,13 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { return errors.Trace(err) } - // Note: We still need tableWorkers to control the concurrency of tables. In the future, we will investigate more about - // the difference between restoring tables concurrently and restoring tables one by one. - - restoreWorker := rc.tableWorkers.Apply() wg.Add(1) - go func(w *worker.RestoreWorker, t *TableRestore, cp *TableCheckpoint) { + go func(t *TableRestore, cp *TableCheckpoint) { defer wg.Done() - - closedEngine, err := t.restore(ctx, rc, cp) - defer func() { - metric.RecordTableCount("completed", err) - if err != nil { - restoreErrLock.Lock() - if restoreErr == nil { - restoreErr = err - } - restoreErrLock.Unlock() - } - }() - t.Close() - rc.tableWorkers.Recycle(w) - if err != nil { - if !common.IsContextCanceledError(err) { - common.AppLogger.Errorf("[%s] restore error %v", t.tableName, errors.ErrorStack(err)) - } - return - } - - err = t.postProcess(ctx, closedEngine, rc, cp) - }(restoreWorker, tr, cp) + err := t.restoreTable(ctx, rc, cp) + metric.RecordTableCount("completed", err) + restoreErr.Set(t.tableName, err) + }(tr, cp) } } @@ -476,37 +447,33 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { stopPeriodicActions <- struct{}{} common.AppLogger.Infof("restore all tables data takes %v", time.Since(timer)) - restoreErrLock.Lock() - defer restoreErrLock.Unlock() - return errors.Trace(restoreErr) + return errors.Trace(restoreErr.Get()) } -func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *TableCheckpoint) (*kv.ClosedEngine, error) { - if cp.Status >= CheckpointStatusClosed { - closedEngine, err := rc.importer.UnsafeCloseEngine(ctx, t.tableName, cp.Engine) - return closedEngine, errors.Trace(err) - } - - engine, err := rc.importer.OpenEngine(ctx, t.tableName, cp.Engine) - if err != nil { - return nil, errors.Trace(err) - } +func (t *TableRestore) restoreTable( + ctx context.Context, + rc *RestoreController, + cp *TableCheckpoint, +) error { + // 1. Load the table info. // no need to do anything if the chunks are already populated - if len(cp.Chunks) > 0 { - common.AppLogger.Infof("[%s] reusing %d chunks from checkpoint", t.tableName, len(cp.Chunks)) + if len(cp.Engines) > 0 { + common.AppLogger.Infof("[%s] reusing %d engines and %d chunks from checkpoint", t.tableName, len(cp.Engines), cp.CountChunks()) } else if cp.Status < CheckpointStatusAllWritten { - if err := t.populateChunks(rc.cfg.Mydumper.MinRegionSize, cp); err != nil { - return nil, errors.Trace(err) + if err := t.populateChunks(rc.cfg, cp); err != nil { + return errors.Trace(err) } - if err := rc.checkpointsDB.InsertChunkCheckpoints(ctx, t.tableName, cp.Chunks); err != nil { - return nil, errors.Trace(err) + if err := rc.checkpointsDB.InsertEngineCheckpoints(ctx, t.tableName, cp.Engines); err != nil { + return errors.Trace(err) } // rebase the allocator so it exceeds the number of rows. cp.AllocBase = mathutil.MaxInt64(cp.AllocBase, t.tableInfo.core.AutoIncID) - for _, chunk := range cp.Chunks { - cp.AllocBase = mathutil.MaxInt64(cp.AllocBase, chunk.Chunk.RowIDMax) + for _, engine := range cp.Engines { + for _, chunk := range engine.Chunks { + cp.AllocBase = mathutil.MaxInt64(cp.AllocBase, chunk.Chunk.RowIDMax) + } } t.alloc.Rebase(t.tableInfo.ID, cp.AllocBase, false) rc.saveCpCh <- saveCp{ @@ -517,14 +484,82 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T } } - var wg sync.WaitGroup - var ( - chunkErrMutex sync.Mutex - chunkErr error - ) + // 2. Restore engines (if still needed) + + if cp.Status < CheckpointStatusImported { + timer := time.Now() + + var wg sync.WaitGroup + var engineErr common.OnceError + + for engineID, engine := range cp.Engines { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if engineErr.Get() != nil { + break + } + + wg.Add(1) + + // Note: We still need tableWorkers to control the concurrency of tables. + // In the future, we will investigate more about + // the difference between restoring tables concurrently and restoring tables one by one. + restoreWorker := rc.tableWorkers.Apply() + + go func(w *worker.RestoreWorker, eid int, ecp *EngineCheckpoint) { + defer wg.Done() + tag := fmt.Sprintf("%s:%d", t.tableName, eid) + + closedEngine, err := t.restoreEngine(ctx, rc, eid, ecp) + rc.tableWorkers.Recycle(w) + if err != nil { + engineErr.Set(tag, err) + return + } + if err := t.importEngine(ctx, closedEngine, rc, eid, ecp); err != nil { + engineErr.Set(tag, err) + } + }(restoreWorker, engineID, engine) + } + + wg.Wait() + + common.AppLogger.Infof("[%s] import whole table takes %v", t.tableName, time.Since(timer)) + err := engineErr.Get() + rc.saveStatusCheckpoint(t.tableName, -1, err, CheckpointStatusImported) + if err != nil { + return errors.Trace(err) + } + } + + // 3. Post-process + + return errors.Trace(t.postProcess(ctx, rc, cp)) +} + +func (t *TableRestore) restoreEngine( + ctx context.Context, + rc *RestoreController, + engineID int, + cp *EngineCheckpoint, +) (*kv.ClosedEngine, error) { + if cp.Status >= CheckpointStatusClosed { + closedEngine, err := rc.importer.UnsafeCloseEngine(ctx, t.tableName, engineID) + return closedEngine, errors.Trace(err) + } timer := time.Now() - handledChunksCount := new(int32) + + engine, err := rc.importer.OpenEngine(ctx, t.tableName, engineID) + if err != nil { + return nil, errors.Trace(err) + } + + var wg sync.WaitGroup + var chunkErr common.OnceError // Restore table data for chunkIndex, chunk := range cp.Chunks { @@ -538,10 +573,7 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T default: } - chunkErrMutex.Lock() - err := chunkErr - chunkErrMutex.Unlock() - if err != nil { + if chunkErr.Get() != nil { break } @@ -567,38 +599,37 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T rc.regionWorkers.Recycle(w) }() metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Inc() - err := cr.restore(ctx, t, engine, rc) - if err != nil { - metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Inc() - if !common.IsContextCanceledError(err) { - common.AppLogger.Errorf("[%s] chunk #%d (%s) run task error %s", t.tableName, cr.index, &cr.chunk.Key, errors.ErrorStack(err)) - } - chunkErrMutex.Lock() - if chunkErr == nil { - chunkErr = err - } - chunkErrMutex.Unlock() + err := cr.restore(ctx, t, engineID, engine, rc) + if err == nil { + metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Inc() return } - metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Inc() - - handled := int(atomic.AddInt32(handledChunksCount, 1)) - common.AppLogger.Infof("[%s] handled region count = %d (%s)", t.tableName, handled, common.Percent(handled, len(cp.Chunks))) + metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Inc() + tag := fmt.Sprintf("%s:%d] [%s", t.tableName, engineID, &cr.chunk.Key) + chunkErr.Set(tag, err) }(restoreWorker, cr) } wg.Wait() - common.AppLogger.Infof("[%s] encode kv data and write takes %v", t.tableName, time.Since(timer)) - chunkErrMutex.Lock() - err = chunkErr - chunkErrMutex.Unlock() - rc.saveStatusCheckpoint(t.tableName, err, CheckpointStatusAllWritten) + dur := time.Since(timer) + + // Report some statistics into the log for debugging. + totalKVSize := uint64(0) + totalSQLSize := int64(0) + for _, chunk := range cp.Chunks { + totalKVSize += chunk.Checksum.SumSize() + totalSQLSize += chunk.Chunk.EndOffset + } + + common.AppLogger.Infof("[%s:%d] encode kv data and write takes %v (read %d, written %d)", t.tableName, engineID, dur, totalSQLSize, totalKVSize) + err = chunkErr.Get() + rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusAllWritten) if err != nil { return nil, errors.Trace(err) } closedEngine, err := engine.Close(ctx) - rc.saveStatusCheckpoint(t.tableName, err, CheckpointStatusClosed) + rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusClosed) if err != nil { common.AppLogger.Errorf("[kv-deliver] flush stage with error (step = close) : %s", errors.ErrorStack(err)) return nil, errors.Trace(err) @@ -606,33 +637,46 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T return closedEngine, nil } -func (t *TableRestore) postProcess(ctx context.Context, closedEngine *kv.ClosedEngine, rc *RestoreController, cp *TableCheckpoint) error { +func (t *TableRestore) importEngine( + ctx context.Context, + closedEngine *kv.ClosedEngine, + rc *RestoreController, + engineID int, + cp *EngineCheckpoint, +) error { + if cp.Status >= CheckpointStatusImported { + return nil + } + // 1. close engine, then calling import // FIXME: flush is an asynchronous operation, what if flush failed? - if cp.Status < CheckpointStatusImported { - // the lock ensures the import() step will not be concurrent. - rc.postProcessLock.Lock() - err := t.importKV(ctx, closedEngine) - // gofail: var SlowDownImport struct{} - rc.postProcessLock.Unlock() - rc.saveStatusCheckpoint(t.tableName, err, CheckpointStatusImported) - if err != nil { - return errors.Trace(err) - } - // 2. perform a level-1 compact if idling. - if atomic.CompareAndSwapInt32(&rc.compactState, compactStateIdle, compactStateDoing) { - go func() { - err := rc.doCompact(ctx, Level1Compact) - if err != nil { - // log it and continue - common.AppLogger.Warnf("compact %d failed %v", Level1Compact, err) - } - atomic.StoreInt32(&rc.compactState, compactStateIdle) - }() - } + // the lock ensures the import() step will not be concurrent. + rc.postProcessLock.Lock() + err := t.importKV(ctx, closedEngine) + // gofail: var SlowDownImport struct{} + rc.postProcessLock.Unlock() + rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusImported) + if err != nil { + return errors.Trace(err) } + // 2. perform a level-1 compact if idling. + if atomic.CompareAndSwapInt32(&rc.compactState, compactStateIdle, compactStateDoing) { + go func() { + err := rc.doCompact(ctx, Level1Compact) + if err != nil { + // log it and continue + common.AppLogger.Warnf("compact %d failed %v", Level1Compact, err) + } + atomic.StoreInt32(&rc.compactState, compactStateIdle) + }() + } + + return nil +} + +func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, cp *TableCheckpoint) error { setSessionConcurrencyVars(ctx, rc.tidbMgr.db, rc.cfg.TiDB) // 3. alter table set auto_increment @@ -640,7 +684,7 @@ func (t *TableRestore) postProcess(ctx context.Context, closedEngine *kv.ClosedE rc.alterTableLock.Lock() err := t.restoreTableMeta(ctx, rc.tidbMgr.db) rc.alterTableLock.Unlock() - rc.saveStatusCheckpoint(t.tableName, err, CheckpointStatusAlteredAutoInc) + rc.saveStatusCheckpoint(t.tableName, -1, err, CheckpointStatusAlteredAutoInc) if err != nil { common.AppLogger.Errorf( "[%[1]s] failed to AUTO TABLE %[1]s SET AUTO_INCREMENT=%[2]d : %[3]v", @@ -654,10 +698,10 @@ func (t *TableRestore) postProcess(ctx context.Context, closedEngine *kv.ClosedE if cp.Status < CheckpointStatusChecksummed { if !rc.cfg.PostRestore.Checksum { common.AppLogger.Infof("[%s] Skip checksum.", t.tableName) - rc.saveStatusCheckpoint(t.tableName, nil, CheckpointStatusChecksumSkipped) + rc.saveStatusCheckpoint(t.tableName, -1, nil, CheckpointStatusChecksumSkipped) } else { err := t.compareChecksum(ctx, rc.tidbMgr.db, cp) - rc.saveStatusCheckpoint(t.tableName, err, CheckpointStatusChecksummed) + rc.saveStatusCheckpoint(t.tableName, -1, err, CheckpointStatusChecksummed) if err != nil { common.AppLogger.Errorf("[%s] checksum failed: %v", t.tableName, err.Error()) return errors.Trace(err) @@ -669,10 +713,10 @@ func (t *TableRestore) postProcess(ctx context.Context, closedEngine *kv.ClosedE if cp.Status < CheckpointStatusAnalyzed { if !rc.cfg.PostRestore.Analyze { common.AppLogger.Infof("[%s] Skip analyze.", t.tableName) - rc.saveStatusCheckpoint(t.tableName, nil, CheckpointStatusAnalyzeSkipped) + rc.saveStatusCheckpoint(t.tableName, -1, nil, CheckpointStatusAnalyzeSkipped) } else { err := t.analyzeTable(ctx, rc.tidbMgr.db) - rc.saveStatusCheckpoint(t.tableName, err, CheckpointStatusAnalyzed) + rc.saveStatusCheckpoint(t.tableName, -1, err, CheckpointStatusAnalyzed) if err != nil { common.AppLogger.Errorf("[%s] analyze failed: %v", t.tableName, err.Error()) return errors.Trace(err) @@ -938,19 +982,20 @@ func (tr *TableRestore) Close() { var tidbRowIDColumnRegex = regexp.MustCompile(fmt.Sprintf("`%[1]s`|(?i:\\b%[1]s\\b)", model.ExtraHandleName)) -func (t *TableRestore) populateChunks(minChunkSize int64, cp *TableCheckpoint) error { +func (t *TableRestore) populateChunks(cfg *config.Config, cp *TableCheckpoint) error { common.AppLogger.Infof("[%s] load chunks", t.tableName) timer := time.Now() - chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns) + chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns, cfg.Mydumper.BatchSize, cfg.Mydumper.BatchImportRatio, cfg.App.TableConcurrency) if err != nil { return errors.Trace(err) } - cp.Chunks = make([]*ChunkCheckpoint, 0, len(chunks)) - for _, chunk := range chunks { - cp.Chunks = append(cp.Chunks, &ChunkCheckpoint{ + for chunk.EngineID >= len(cp.Engines) { + cp.Engines = append(cp.Engines, &EngineCheckpoint{Status: CheckpointStatusLoaded}) + } + cp.Engines[chunk.EngineID].Chunks = append(cp.Engines[chunk.EngineID].Chunks, &ChunkCheckpoint{ Key: ChunkCheckpointKey{ Path: chunk.File, Offset: chunk.Chunk.Offset, @@ -960,7 +1005,7 @@ func (t *TableRestore) populateChunks(minChunkSize int64, cp *TableCheckpoint) e }) } - common.AppLogger.Infof("[%s] load %d chunks takes %v", t.tableName, len(chunks), time.Since(timer)) + common.AppLogger.Infof("[%s] load %d engines and %d chunks takes %v", t.tableName, len(cp.Engines), len(chunks), time.Since(timer)) return nil } @@ -1025,8 +1070,10 @@ func (tr *TableRestore) importKV(ctx context.Context, closedEngine *kv.ClosedEng // do checksum for each table. func (tr *TableRestore) compareChecksum(ctx context.Context, db *sql.DB, cp *TableCheckpoint) error { var localChecksum verify.KVChecksum - for _, chunk := range cp.Chunks { - localChecksum.Add(&chunk.Checksum) + for _, engine := range cp.Engines { + for _, chunk := range engine.Chunks { + localChecksum.Add(&chunk.Checksum) + } } start := time.Now() @@ -1184,6 +1231,7 @@ func splitIntoDeliveryStreams(totalKVs []kvenc.KvPair, splitSize int) [][]kvenc. func (cr *chunkRestore) restore( ctx context.Context, t *TableRestore, + engineID int, engine *kv.OpenedEngine, rc *RestoreController, ) error { @@ -1261,7 +1309,7 @@ func (cr *chunkRestore) restore( block.cond.Signal() if e := stream.Close(); e != nil { if err != nil { - common.AppLogger.Warnf("failed to close write stream: %s", e.Error()) + common.AppLogger.Warnf("[%s:%d] failed to close write stream: %s", t.tableName, engineID, e.Error()) } else { err = e } @@ -1272,8 +1320,10 @@ func (cr *chunkRestore) restore( metric.BlockDeliverBytesHistogram.Observe(float64(b.localChecksum.SumSize())) if err != nil { + if !common.IsContextCanceledError(err) { + common.AppLogger.Errorf("[%s:%d] kv deliver failed = %v", t.tableName, engineID, err) + } // TODO : retry ~ - common.AppLogger.Errorf("kv deliver failed = %s\n", err.Error()) deliverCompleteCh <- errors.Trace(err) return } @@ -1292,6 +1342,7 @@ func (cr *chunkRestore) restore( rc.saveCpCh <- saveCp{ tableName: t.tableName, merger: &ChunkCheckpointMerger{ + EngineID: engineID, Key: cr.chunk.Key, Checksum: cr.chunk.Checksum, Pos: cr.chunk.Chunk.Offset, @@ -1398,8 +1449,8 @@ func (cr *chunkRestore) restore( case err := <-deliverCompleteCh: if err == nil { common.AppLogger.Infof( - "[%s] restore chunk #%d (%s) takes %v (read: %v, encode: %v, deliver: %v)", - t.tableName, cr.index, &cr.chunk.Key, time.Since(timer), + "[%s:%d] restore chunk #%d (%s) takes %v (read: %v, encode: %v, deliver: %v)", + t.tableName, engineID, cr.index, &cr.chunk.Key, time.Since(timer), readTotalDur, encodeTotalDur, deliverTotalDur, ) } diff --git a/tests/checkpoint/run.sh b/tests/checkpoint/run.sh index 52d4e1b38..72dde9df2 100755 --- a/tests/checkpoint/run.sh +++ b/tests/checkpoint/run.sh @@ -64,7 +64,7 @@ echo "******** Verify checkpoint no-op ********" run_lightning run_sql "$PARTIAL_IMPORT_QUERY" check_contains "s: $(( (1000 * $CHUNK_COUNT + 1001) * $CHUNK_COUNT * $TABLE_COUNT ))" -run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cppk.table_v1 WHERE status >= 200" +run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cppk.table_v4 WHERE status >= 200" check_contains "count(*): $TABLE_COUNT" # Ensure there is no dangling open engines diff --git a/tests/checkpoint_chunks/run.sh b/tests/checkpoint_chunks/run.sh index 725fb11a8..9fd6bd83d 100755 --- a/tests/checkpoint_chunks/run.sh +++ b/tests/checkpoint_chunks/run.sh @@ -41,7 +41,7 @@ run_lightning run_sql 'SELECT count(i), sum(i) FROM cpch_tsr.tbl;' check_contains "count(i): $(($ROW_COUNT*$CHUNK_COUNT))" check_contains "sum(i): $(( $ROW_COUNT*$CHUNK_COUNT*(($CHUNK_COUNT+2)*$ROW_COUNT + 1)/2 ))" -run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.table_v1 WHERE status >= 200" +run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.table_v4 WHERE status >= 200" check_contains "count(*): 1" # Repeat, but using the file checkpoint diff --git a/tests/checkpoint_engines/config.toml b/tests/checkpoint_engines/config.toml new file mode 100644 index 000000000..1b90360c5 --- /dev/null +++ b/tests/checkpoint_engines/config.toml @@ -0,0 +1,29 @@ +[lightning] +table-concurrency = 1 +check-requirements = false +file = "/tmp/lightning_test_result/lightning-checkpoint-engines.log" +level = "info" + +[checkpoint] +enable = true +driver = "file" + +[tikv-importer] +addr = "127.0.0.1:8808" + +[mydumper] +data-source-dir = "tests/checkpoint_engines/data" +batch-size = 50 # force splitting the data into 4 batches + +[tidb] +host = "127.0.0.1" +port = 4000 +user = "root" +status-port = 10080 +pd-addr = "127.0.0.1:2379" +log-level = "error" + +[post-restore] +checksum = true +compact = false +analyze = false diff --git a/tests/checkpoint_engines/data/cpeng-schema-create.sql b/tests/checkpoint_engines/data/cpeng-schema-create.sql new file mode 100644 index 000000000..1e23466ee --- /dev/null +++ b/tests/checkpoint_engines/data/cpeng-schema-create.sql @@ -0,0 +1 @@ +create database cpeng; diff --git a/tests/checkpoint_engines/data/cpeng.a-schema.sql b/tests/checkpoint_engines/data/cpeng.a-schema.sql new file mode 100644 index 000000000..fe3f493b6 --- /dev/null +++ b/tests/checkpoint_engines/data/cpeng.a-schema.sql @@ -0,0 +1 @@ +create table a (c int); diff --git a/tests/checkpoint_engines/data/cpeng.a.1.sql b/tests/checkpoint_engines/data/cpeng.a.1.sql new file mode 100644 index 000000000..58829b7d8 --- /dev/null +++ b/tests/checkpoint_engines/data/cpeng.a.1.sql @@ -0,0 +1 @@ +insert into a values (1); diff --git a/tests/checkpoint_engines/data/cpeng.a.2.sql b/tests/checkpoint_engines/data/cpeng.a.2.sql new file mode 100644 index 000000000..ccbcb5801 --- /dev/null +++ b/tests/checkpoint_engines/data/cpeng.a.2.sql @@ -0,0 +1 @@ +insert into a values (2); diff --git a/tests/checkpoint_engines/data/cpeng.a.3.sql b/tests/checkpoint_engines/data/cpeng.a.3.sql new file mode 100644 index 000000000..effdc8f3e --- /dev/null +++ b/tests/checkpoint_engines/data/cpeng.a.3.sql @@ -0,0 +1 @@ +insert into a values (3),(4); diff --git a/tests/checkpoint_engines/data/cpeng.b-schema.sql b/tests/checkpoint_engines/data/cpeng.b-schema.sql new file mode 100644 index 000000000..4a3c844ef --- /dev/null +++ b/tests/checkpoint_engines/data/cpeng.b-schema.sql @@ -0,0 +1 @@ +create table b (c int); diff --git a/tests/checkpoint_engines/data/cpeng.b.1.sql b/tests/checkpoint_engines/data/cpeng.b.1.sql new file mode 100644 index 000000000..342c150d9 --- /dev/null +++ b/tests/checkpoint_engines/data/cpeng.b.1.sql @@ -0,0 +1,4 @@ +insert into b values (10),(11),(12); +/* +padding to make the data file > 50 bytes +*/ \ No newline at end of file diff --git a/tests/checkpoint_engines/data/cpeng.b.2.sql b/tests/checkpoint_engines/data/cpeng.b.2.sql new file mode 100644 index 000000000..83045aee9 --- /dev/null +++ b/tests/checkpoint_engines/data/cpeng.b.2.sql @@ -0,0 +1 @@ +insert into b values (13); diff --git a/tests/checkpoint_engines/mysql.toml b/tests/checkpoint_engines/mysql.toml new file mode 100644 index 000000000..ff00fad82 --- /dev/null +++ b/tests/checkpoint_engines/mysql.toml @@ -0,0 +1,29 @@ +[lightning] +table-concurrency = 1 +check-requirements = false +file = "/tmp/lightning_test_result/lightning-checkpoint-engines.log" +level = "info" + +[checkpoint] +enable = true +driver = "mysql" + +[tikv-importer] +addr = "127.0.0.1:8808" + +[mydumper] +data-source-dir = "tests/checkpoint_engines/data" +batch-size = 50 # force splitting the data into 4 batches + +[tidb] +host = "127.0.0.1" +port = 4000 +user = "root" +status-port = 10080 +pd-addr = "127.0.0.1:2379" +log-level = "error" + +[post-restore] +checksum = true +compact = false +analyze = false diff --git a/tests/checkpoint_engines/run.sh b/tests/checkpoint_engines/run.sh new file mode 100755 index 000000000..aabeae4b7 --- /dev/null +++ b/tests/checkpoint_engines/run.sh @@ -0,0 +1,71 @@ +#!/bin/sh + +set -eu + +# First, verify that a normal operation is fine. + +rm -f "$TEST_DIR/lightning-checkpoint-engines.log" +run_sql 'DROP DATABASE IF EXISTS cpeng;' + +run_lightning + +# Check that we have indeed opened 4 engines +OPEN_ENGINES_COUNT=$(grep 'open engine' "$TEST_DIR/lightning-checkpoint-engines.log" | wc -l) +echo "Number of open engines: $OPEN_ENGINES_COUNT" +[ "$OPEN_ENGINES_COUNT" -eq 4 ] + +# Check that everything is correctly imported +run_sql 'SELECT count(*), sum(c) FROM cpeng.a' +check_contains 'count(*): 4' +check_contains 'sum(c): 10' + +run_sql 'SELECT count(*), sum(c) FROM cpeng.b' +check_contains 'count(*): 4' +check_contains 'sum(c): 46' + +# Now, verify it works with checkpoints as well. + +run_sql 'DROP DATABASE cpeng;' + +export GOFAIL_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailIfStatusBecomes=return(120)' +set +e +for i in $(seq "$OPEN_ENGINES_COUNT"); do + echo "******** Importing Table Now (step $i/4) ********" + run_lightning 2> /dev/null + [ $? -ne 0 ] || exit 1 +done +set -e + +echo "******** Verify checkpoint no-op ********" +run_lightning + +run_sql 'SELECT count(*), sum(c) FROM cpeng.a' +check_contains 'count(*): 4' +check_contains 'sum(c): 10' + +run_sql 'SELECT count(*), sum(c) FROM cpeng.b' +check_contains 'count(*): 4' +check_contains 'sum(c): 46' + +# Now, try again with MySQL checkpoints + +run_sql 'DROP DATABASE cpeng;' + +set +e +for i in $(seq "$OPEN_ENGINES_COUNT"); do + echo "******** Importing Table Now (step $i/4) ********" + run_lightning mysql 2> /dev/null + [ $? -ne 0 ] || exit 1 +done +set -e + +echo "******** Verify checkpoint no-op ********" +run_lightning mysql + +run_sql 'SELECT count(*), sum(c) FROM cpeng.a' +check_contains 'count(*): 4' +check_contains 'sum(c): 10' + +run_sql 'SELECT count(*), sum(c) FROM cpeng.b' +check_contains 'count(*): 4' +check_contains 'sum(c): 46' \ No newline at end of file diff --git a/tests/error_summary/run.sh b/tests/error_summary/run.sh index 3f92e6d20..c4de9d44f 100755 --- a/tests/error_summary/run.sh +++ b/tests/error_summary/run.sh @@ -1,6 +1,6 @@ #!/bin/sh -set -eu +set -eux # Check that error summary are written at the bottom of import. diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 9d15074be..3f4fcf8b6 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -50,14 +50,24 @@ driver = "file" [tikv-importer] addr = "127.0.0.1:8808" -# size of batch to import kv data into TiKV: xxx (GB) -batch-size = 500 # GB [mydumper] # block size of file reading read-block-size = 65536 # Byte (default = 64 KB) -# split source data file into multiple region/chunk to execute restoring in parallel -region-min-size = 268435456 # Byte (default = 256 MB) +# minimum size (in terms of source data file) of each batch of import. +# Lightning will split a large table into multiple engine files according to this size. +batch-size = 107_374_182_400 # Byte (default = 100 GiB) + +# Engine file needs to be imported sequentially. Due to table-concurrency, multiple engines will be +# imported nearly the same time, and this will create a queue and this wastes resources. Therefore, +# Lightning will slightly increase the size of the first few batches to properly distribute +# resources. The scale up is controlled by this parameter, which expresses the ratio of duration +# between the "import" and "write" steps with full concurrency. This can be calculated as the ratio +# (import duration / write duration) of a single table of size around 1 GB. The exact timing can be +# found in the log. If "import" is faster, the batch size anomaly is smaller, and a ratio of +# zero means uniform batch size. This value should be in the range (0 <= batch-import-ratio < 1). +batch-import-ratio = 0.75 + # mydumper local source data directory data-source-dir = "/tmp/export-20180328-200751" # if no-schema is set true, lightning will get schema information from tidb-server directly without creating them.