diff --git a/cmd/tidb-lightning-ctl/main.go b/cmd/tidb-lightning-ctl/main.go index 836f2695a..f86e5737b 100644 --- a/cmd/tidb-lightning-ctl/main.go +++ b/cmd/tidb-lightning-ctl/main.go @@ -176,8 +176,8 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tableName s for _, table := range targetTables { for engineID := 0; engineID < table.EnginesCount; engineID++ { - fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID) - closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, engineID) + fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, int32(engineID)) + closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, int32(engineID)) if err != nil { fmt.Fprintln(os.Stderr, "* Encountered error while closing engine:", err) lastErr = err diff --git a/lightning/config/config.go b/lightning/config/config.go index 51a7c3e73..0f52030f3 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -83,6 +83,7 @@ func (c *Config) String() string { type Lightning struct { common.LogConfig TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"` + IndexConcurrency int `toml:"index-concurrency" json:"index-concurrency"` RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"` IOConcurrency int `toml:"io-concurrency" json:"io-concurrency"` ProfilePort int `toml:"pprof-port" json:"pprof-port"` @@ -91,10 +92,10 @@ type Lightning struct { // PostRestore has some options which will be executed after kv restored. type PostRestore struct { - Level1Compact *bool `toml:"level-1-compact" json:"level-1-compact"` - Compact bool `toml:"compact" json:"compact"` - Checksum bool `toml:"checksum" json:"checksum"` - Analyze bool `toml:"analyze" json:"analyze"` + Level1Compact bool `toml:"level-1-compact" json:"level-1-compact"` + Compact bool `toml:"compact" json:"compact"` + Checksum bool `toml:"checksum" json:"checksum"` + Analyze bool `toml:"analyze" json:"analyze"` } type MydumperRuntime struct { @@ -144,6 +145,7 @@ func NewConfig() *Config { App: Lightning{ RegionConcurrency: runtime.NumCPU(), TableConcurrency: 8, + IndexConcurrency: 2, IOConcurrency: 5, CheckRequirements: true, }, @@ -228,12 +230,5 @@ func (cfg *Config) Load() error { cfg.Checkpoint.DSN = "/tmp/" + cfg.Checkpoint.Schema + ".pb" } } - - // If the level 1 compact configuration not found, default to true - if cfg.PostRestore.Level1Compact == nil { - cfg.PostRestore.Level1Compact = new(bool) - *cfg.PostRestore.Level1Compact = true - } - return nil } diff --git a/lightning/kv/importer.go b/lightning/kv/importer.go index 2c92e9221..02064cfac 100644 --- a/lightning/kv/importer.go +++ b/lightning/kv/importer.go @@ -155,7 +155,7 @@ func isIgnorableOpenCloseEngineError(err error) bool { return err == nil || strings.Contains(err.Error(), "FileExists") } -func makeTag(tableName string, engineID int) string { +func makeTag(tableName string, engineID int32) string { return fmt.Sprintf("%s:%d", tableName, engineID) } @@ -166,7 +166,7 @@ var engineNamespace = uuid.Must(uuid.FromString("d68d6abe-c59e-45d6-ade8-e2b0ceb func (importer *Importer) OpenEngine( ctx context.Context, tableName string, - engineID int, + engineID int32, ) (*OpenedEngine, error) { tag := makeTag(tableName, engineID) engineUUID := uuid.NewV5(engineNamespace, tag) @@ -312,7 +312,7 @@ func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) { // (Open -> Write -> Close -> Import). This method should only be used when one // knows via other ways that the engine has already been opened, e.g. when // resuming from a checkpoint. -func (importer *Importer) UnsafeCloseEngine(ctx context.Context, tableName string, engineID int) (*ClosedEngine, error) { +func (importer *Importer) UnsafeCloseEngine(ctx context.Context, tableName string, engineID int32) (*ClosedEngine, error) { tag := makeTag(tableName, engineID) engineUUID := uuid.NewV5(engineNamespace, tag) return importer.unsafeCloseEngine(ctx, tag, engineUUID) diff --git a/lightning/mydump/parser_generated.go b/lightning/mydump/parser_generated.go index 0e0ee4d42..f3578797c 100644 --- a/lightning/mydump/parser_generated.go +++ b/lightning/mydump/parser_generated.go @@ -1,6 +1,19 @@ // Code generated by ragel DO NOT EDIT. //.... lightning/mydump/parser.rl:1 +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + // Please edit `parser.rl` if you want to modify this file. To generate // `parser_generated.go`, please execute // @@ -17,21 +30,21 @@ import ( "github.com/pingcap/tidb-lightning/lightning/common" ) -//.... lightning/mydump/parser.rl:79 +//.... lightning/mydump/parser.rl:92 -//.... tmp_parser.go:25 +//.... tmp_parser.go:38 const chunk_parser_start int = 27 const chunk_parser_first_final int = 27 const chunk_parser_error int = 0 const chunk_parser_en_main int = 27 -//.... lightning/mydump/parser.rl:82 +//.... lightning/mydump/parser.rl:95 func (parser *ChunkParser) lex() (token, []byte, error) { var cs, ts, te, act, p int - //.... tmp_parser.go:38 + //.... tmp_parser.go:51 { cs = chunk_parser_start ts = 0 @@ -39,7 +52,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { act = 0 } - //.... lightning/mydump/parser.rl:86 + //.... lightning/mydump/parser.rl:99 for { data := parser.buf @@ -50,7 +63,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { eof = pe } - //.... tmp_parser.go:58 + //.... tmp_parser.go:71 { if p == pe { goto _test_eof @@ -191,7 +204,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto st27 tr9: - //.... lightning/mydump/parser.rl:67 + //.... lightning/mydump/parser.rl:80 te = p + 1 { consumedToken = tokRow @@ -203,7 +216,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { } goto st27 tr19: - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 p = (te) - 1 { consumedToken = tokName @@ -215,12 +228,12 @@ func (parser *ChunkParser) lex() (token, []byte, error) { } goto st27 tr21: - //.... lightning/mydump/parser.rl:60 + //.... lightning/mydump/parser.rl:73 te = p + 1 goto st27 tr40: - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 te = p p-- { @@ -233,7 +246,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { } goto st27 tr41: - //.... lightning/mydump/parser.rl:60 + //.... lightning/mydump/parser.rl:73 te = p p-- @@ -252,7 +265,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 ts = p - //.... tmp_parser.go:233 + //.... tmp_parser.go:246 switch data[p] { case 32: goto tr21 @@ -292,21 +305,21 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st28 tr43: //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:60 + //.... lightning/mydump/parser.rl:73 act = 1 goto st28 tr53: //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:62 + //.... lightning/mydump/parser.rl:75 act = 2 goto st28 st28: @@ -314,7 +327,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof28 } st_case_28: - //.... tmp_parser.go:295 + //.... tmp_parser.go:308 switch data[p] { case 32: goto tr0 @@ -497,7 +510,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st29 st29: @@ -505,7 +518,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof29 } st_case_29: - //.... tmp_parser.go:486 + //.... tmp_parser.go:499 switch data[p] { case 32: goto tr40 @@ -539,7 +552,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof30 } st_case_30: - //.... tmp_parser.go:520 + //.... tmp_parser.go:533 switch data[p] { case 10: goto tr21 @@ -590,7 +603,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:60 + //.... lightning/mydump/parser.rl:73 act = 1 goto st31 st31: @@ -598,7 +611,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof31 } st_case_31: - //.... tmp_parser.go:579 + //.... tmp_parser.go:592 switch data[p] { case 34: goto tr2 @@ -631,7 +644,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:60 + //.... lightning/mydump/parser.rl:73 act = 1 goto st32 st32: @@ -639,7 +652,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof32 } st_case_32: - //.... tmp_parser.go:620 + //.... tmp_parser.go:633 if data[p] == 96 { goto tr2 } @@ -648,7 +661,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st33 st33: @@ -656,7 +669,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof33 } st_case_33: - //.... tmp_parser.go:637 + //.... tmp_parser.go:650 switch data[p] { case 32: goto tr40 @@ -690,7 +703,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof34 } st_case_34: - //.... tmp_parser.go:671 + //.... tmp_parser.go:684 switch data[p] { case 32: goto st20 @@ -784,7 +797,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof35 } st_case_35: - //.... tmp_parser.go:765 + //.... tmp_parser.go:778 switch data[p] { case 32: goto st20 @@ -840,7 +853,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st36 st36: @@ -848,7 +861,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof36 } st_case_36: - //.... tmp_parser.go:829 + //.... tmp_parser.go:842 switch data[p] { case 32: goto tr40 @@ -878,7 +891,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st37 st37: @@ -886,7 +899,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof37 } st_case_37: - //.... tmp_parser.go:867 + //.... tmp_parser.go:880 switch data[p] { case 32: goto tr40 @@ -920,7 +933,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st38 st38: @@ -928,7 +941,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof38 } st_case_38: - //.... tmp_parser.go:909 + //.... tmp_parser.go:922 switch data[p] { case 32: goto tr40 @@ -958,7 +971,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st39 st39: @@ -966,7 +979,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof39 } st_case_39: - //.... tmp_parser.go:947 + //.... tmp_parser.go:960 switch data[p] { case 32: goto tr40 @@ -996,7 +1009,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st40 st40: @@ -1004,7 +1017,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof40 } st_case_40: - //.... tmp_parser.go:985 + //.... tmp_parser.go:998 switch data[p] { case 32: goto tr40 @@ -1034,7 +1047,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st41 st41: @@ -1042,7 +1055,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof41 } st_case_41: - //.... tmp_parser.go:1023 + //.... tmp_parser.go:1036 switch data[p] { case 32: goto tr40 @@ -1072,7 +1085,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st42 st42: @@ -1080,7 +1093,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof42 } st_case_42: - //.... tmp_parser.go:1061 + //.... tmp_parser.go:1074 switch data[p] { case 32: goto tr40 @@ -1110,7 +1123,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st43 st43: @@ -1118,7 +1131,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof43 } st_case_43: - //.... tmp_parser.go:1099 + //.... tmp_parser.go:1112 switch data[p] { case 32: goto tr40 @@ -1148,7 +1161,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st44 st44: @@ -1156,7 +1169,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof44 } st_case_44: - //.... tmp_parser.go:1137 + //.... tmp_parser.go:1150 switch data[p] { case 32: goto tr40 @@ -1186,7 +1199,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st45 st45: @@ -1194,7 +1207,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof45 } st_case_45: - //.... tmp_parser.go:1175 + //.... tmp_parser.go:1188 switch data[p] { case 32: goto tr40 @@ -1224,7 +1237,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { //.... NONE:1 te = p + 1 - //.... lightning/mydump/parser.rl:72 + //.... lightning/mydump/parser.rl:85 act = 4 goto st46 st46: @@ -1232,7 +1245,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { goto _test_eof46 } st_case_46: - //.... tmp_parser.go:1213 + //.... tmp_parser.go:1226 switch data[p] { case 32: goto tr40 @@ -1477,7 +1490,7 @@ func (parser *ChunkParser) lex() (token, []byte, error) { } } - //.... lightning/mydump/parser.rl:97 + //.... lightning/mydump/parser.rl:110 if cs == 0 { common.AppLogger.Errorf("Syntax error near byte %d, content is «%s»", parser.pos, string(data)) diff --git a/lightning/mydump/region.go b/lightning/mydump/region.go index 29b202fdb..054d48ec4 100644 --- a/lightning/mydump/region.go +++ b/lightning/mydump/region.go @@ -21,7 +21,7 @@ import ( ) type TableRegion struct { - EngineID int + EngineID int32 DB string Table string @@ -77,7 +77,7 @@ func AllocateEngineIDs( return } - curEngineID := 0 + curEngineID := int32(0) curEngineSize := 0.0 curBatchSize := batchSize diff --git a/lightning/mydump/region_test.go b/lightning/mydump/region_test.go index 5f9208cd0..82412a99a 100644 --- a/lightning/mydump/region_test.go +++ b/lightning/mydump/region_test.go @@ -123,8 +123,8 @@ func (s *testMydumpRegionSuite) TestAllocateEngineIDs(c *C) { filesRegions = append(filesRegions, new(TableRegion)) } - checkEngineSizes := func(what string, expected map[int]int) { - actual := make(map[int]int) + checkEngineSizes := func(what string, expected map[int32]int) { + actual := make(map[int32]int) for _, region := range filesRegions { actual[region.EngineID]++ } @@ -133,13 +133,13 @@ func (s *testMydumpRegionSuite) TestAllocateEngineIDs(c *C) { // Batch size > Total size => Everything in the zero batch. AllocateEngineIDs(filesRegions, dataFileSizes, 1000, 0.5, 1000) - checkEngineSizes("no batching", map[int]int{ + checkEngineSizes("no batching", map[int32]int{ 0: 700, }) // Allocate 3 engines. AllocateEngineIDs(filesRegions, dataFileSizes, 200, 0.5, 1000) - checkEngineSizes("batch size = 200", map[int]int{ + checkEngineSizes("batch size = 200", map[int32]int{ 0: 170, 1: 213, 2: 317, @@ -147,7 +147,7 @@ func (s *testMydumpRegionSuite) TestAllocateEngineIDs(c *C) { // Allocate 3 engines with an alternative ratio AllocateEngineIDs(filesRegions, dataFileSizes, 200, 0.6, 1000) - checkEngineSizes("batch size = 200, ratio = 0.6", map[int]int{ + checkEngineSizes("batch size = 200, ratio = 0.6", map[int32]int{ 0: 160, 1: 208, 2: 332, @@ -155,7 +155,7 @@ func (s *testMydumpRegionSuite) TestAllocateEngineIDs(c *C) { // Allocate 5 engines. AllocateEngineIDs(filesRegions, dataFileSizes, 100, 0.5, 1000) - checkEngineSizes("batch size = 100", map[int]int{ + checkEngineSizes("batch size = 100", map[int32]int{ 0: 93, 1: 105, 2: 122, @@ -165,7 +165,7 @@ func (s *testMydumpRegionSuite) TestAllocateEngineIDs(c *C) { // Number of engines > table concurrency AllocateEngineIDs(filesRegions, dataFileSizes, 50, 0.5, 4) - checkEngineSizes("batch size = 50, limit table conc = 4", map[int]int{ + checkEngineSizes("batch size = 50, limit table conc = 4", map[int32]int{ 0: 50, 1: 59, 2: 73, @@ -183,7 +183,7 @@ func (s *testMydumpRegionSuite) TestAllocateEngineIDs(c *C) { // Zero ratio = Uniform AllocateEngineIDs(filesRegions, dataFileSizes, 100, 0.0, 1000) - checkEngineSizes("batch size = 100, ratio = 0", map[int]int{ + checkEngineSizes("batch size = 100, ratio = 0", map[int32]int{ 0: 100, 1: 100, 2: 100, diff --git a/lightning/restore/checkpoints.go b/lightning/restore/checkpoints.go index 8a706a869..65af26e39 100644 --- a/lightning/restore/checkpoints.go +++ b/lightning/restore/checkpoints.go @@ -41,6 +41,7 @@ const ( CheckpointStatusAllWritten CheckpointStatus = 60 CheckpointStatusClosed CheckpointStatus = 90 CheckpointStatusImported CheckpointStatus = 120 + CheckpointStatusIndexImported CheckpointStatus = 140 CheckpointStatusAlteredAutoInc CheckpointStatus = 150 CheckpointStatusChecksumSkipped CheckpointStatus = 170 CheckpointStatusChecksummed CheckpointStatus = 180 @@ -54,7 +55,7 @@ const ( // the table names to store each kind of checkpoint in the checkpoint database // remember to increase the version number in case of incompatible change. checkpointTableNameTable = "table_v4" - checkpointTableNameEngine = "engine_v4" + checkpointTableNameEngine = "engine_v5" checkpointTableNameChunk = "chunk_v4" ) @@ -115,7 +116,7 @@ type EngineCheckpoint struct { type TableCheckpoint struct { Status CheckpointStatus AllocBase int64 - Engines []*EngineCheckpoint + Engines map[int32]*EngineCheckpoint } func (cp *TableCheckpoint) CountChunks() int { @@ -143,16 +144,16 @@ type TableCheckpointDiff struct { hasRebase bool status CheckpointStatus allocBase int64 - engines map[int]engineCheckpointDiff + engines map[int32]engineCheckpointDiff } func NewTableCheckpointDiff() *TableCheckpointDiff { return &TableCheckpointDiff{ - engines: make(map[int]engineCheckpointDiff), + engines: make(map[int32]engineCheckpointDiff), } } -func (cpd *TableCheckpointDiff) insertEngineCheckpointDiff(engineID int, newDiff engineCheckpointDiff) { +func (cpd *TableCheckpointDiff) insertEngineCheckpointDiff(engineID int32, newDiff engineCheckpointDiff) { if oldDiff, ok := cpd.engines[engineID]; ok { if newDiff.hasStatus { oldDiff.hasStatus = true @@ -183,7 +184,7 @@ type TableCheckpointMerger interface { } type StatusCheckpointMerger struct { - EngineID int // -1 == apply to whole table. + EngineID int32 // wholeTableEngineID == apply to whole table. Status CheckpointStatus } @@ -192,11 +193,11 @@ func (merger *StatusCheckpointMerger) SetInvalid() { } func (merger *StatusCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) { - if merger.EngineID == -1 || merger.Status <= CheckpointStatusMaxInvalid { + if merger.EngineID == wholeTableEngineID || merger.Status <= CheckpointStatusMaxInvalid { cpd.status = merger.Status cpd.hasStatus = true } - if merger.EngineID >= 0 { + if merger.EngineID >= 0 && merger.EngineID != wholeTableEngineID { cpd.insertEngineCheckpointDiff(merger.EngineID, engineCheckpointDiff{ hasStatus: true, status: merger.Status, @@ -206,7 +207,7 @@ func (merger *StatusCheckpointMerger) MergeInto(cpd *TableCheckpointDiff) { } type ChunkCheckpointMerger struct { - EngineID int + EngineID int32 Key ChunkCheckpointKey Checksum verify.KVChecksum Pos int64 @@ -243,7 +244,7 @@ type CheckpointsDB interface { Initialize(ctx context.Context, dbInfo map[string]*TidbDBInfo) error Get(ctx context.Context, tableName string) (*TableCheckpoint, error) Close() error - InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints []*EngineCheckpoint) error + InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error Update(checkpointDiffs map[string]*TableCheckpointDiff) RemoveCheckpoint(ctx context.Context, tableName string) error @@ -270,11 +271,12 @@ func (*NullCheckpointsDB) Close() error { func (*NullCheckpointsDB) Get(_ context.Context, _ string) (*TableCheckpoint, error) { return &TableCheckpoint{ - Status: CheckpointStatusLoaded, + Status: CheckpointStatusLoaded, + Engines: map[int32]*EngineCheckpoint{}, }, nil } -func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ []*EngineCheckpoint) error { +func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ map[int32]*EngineCheckpoint) error { return nil } @@ -318,7 +320,7 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) ( err = common.ExecWithRetry(ctx, db, "(create engine checkpoints table)", fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s.%s ( table_name varchar(261) NOT NULL, - engine_id int unsigned NOT NULL, + engine_id int NOT NULL, status tinyint unsigned DEFAULT 30, create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, @@ -410,7 +412,9 @@ func (cpdb *MySQLCheckpointsDB) Close() error { } func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error) { - cp := new(TableCheckpoint) + cp := &TableCheckpoint{ + Engines: map[int32]*EngineCheckpoint{}, + } purpose := "(read checkpoint " + tableName + ")" err := common.TransactWithRetry(ctx, cpdb.db, purpose, func(c context.Context, tx *sql.Tx) error { @@ -426,16 +430,15 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab defer engineRows.Close() for engineRows.Next() { var ( - engineID int + engineID int32 status uint8 ) if err := engineRows.Scan(&engineID, &status); err != nil { return errors.Trace(err) } - for len(cp.Engines) <= engineID { - cp.Engines = append(cp.Engines, new(EngineCheckpoint)) + cp.Engines[engineID] = &EngineCheckpoint{ + Status: CheckpointStatus(status), } - cp.Engines[engineID].Status = CheckpointStatus(status) } if err := engineRows.Err(); err != nil { return errors.Trace(err) @@ -459,7 +462,7 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab for chunkRows.Next() { var ( value = new(ChunkCheckpoint) - engineID int + engineID int32 kvcBytes uint64 kvcKVs uint64 kvcChecksum uint64 @@ -499,7 +502,7 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab return cp, nil } -func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints []*EngineCheckpoint) error { +func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error { err := common.TransactWithRetry(ctx, cpdb.db, "(update engine checkpoints for "+tableName+")", func(c context.Context, tx *sql.Tx) error { engineStmt, err := tx.PrepareContext(c, fmt.Sprintf(` REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?); @@ -634,7 +637,12 @@ type FileCheckpointsDB struct { } func NewFileCheckpointsDB(path string) *FileCheckpointsDB { - cpdb := &FileCheckpointsDB{path: path} + cpdb := &FileCheckpointsDB{ + path: path, + checkpoints: CheckpointsModel{ + Checkpoints: map[string]*TableCheckpointModel{}, + }, + } // ignore all errors -- file maybe not created yet (and it is fine). content, err := ioutil.ReadFile(path) if err == nil { @@ -669,7 +677,8 @@ func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, dbInfo map[string tableName := common.UniqueTable(db.Name, table.Name) if _, ok := cpdb.checkpoints.Checkpoints[tableName]; !ok { cpdb.checkpoints.Checkpoints[tableName] = &TableCheckpointModel{ - Status: uint32(CheckpointStatusLoaded), + Status: uint32(CheckpointStatusLoaded), + Engines: map[int32]*EngineCheckpointModel{}, } } // TODO check if hash matches @@ -695,10 +704,10 @@ func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableC cp := &TableCheckpoint{ Status: CheckpointStatus(tableModel.Status), AllocBase: tableModel.AllocBase, - Engines: make([]*EngineCheckpoint, 0, len(tableModel.Engines)), + Engines: make(map[int32]*EngineCheckpoint, len(tableModel.Engines)), } - for _, engineModel := range tableModel.Engines { + for engineID, engineModel := range tableModel.Engines { engine := &EngineCheckpoint{ Status: CheckpointStatus(engineModel.Status), Chunks: make([]*ChunkCheckpoint, 0, len(engineModel.Chunks)), @@ -726,26 +735,22 @@ func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableC return engine.Chunks[i].Key.less(&engine.Chunks[j].Key) }) - cp.Engines = append(cp.Engines, engine) + cp.Engines[engineID] = engine } return cp, nil } -func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints []*EngineCheckpoint) error { +func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error { cpdb.lock.Lock() defer cpdb.lock.Unlock() tableModel := cpdb.checkpoints.Checkpoints[tableName] - for len(tableModel.Engines) < len(checkpoints) { - tableModel.Engines = append(tableModel.Engines, &EngineCheckpointModel{ + for engineID, engine := range checkpoints { + engineModel := &EngineCheckpointModel{ Status: uint32(CheckpointStatusLoaded), Chunks: make(map[string]*ChunkCheckpointModel), - }) - } - - for engineID, engine := range checkpoints { - engineModel := tableModel.Engines[engineID] + } for _, value := range engine.Chunks { key := value.Key.String() chunk, ok := engineModel.Chunks[key] @@ -766,6 +771,7 @@ func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableN chunk.KvcKvs = value.Checksum.SumKVS() chunk.KvcChecksum = value.Checksum.Sum() } + tableModel.Engines[engineID] = engineModel } return errors.Trace(cpdb.save()) diff --git a/lightning/restore/file_checkpoints.pb.go b/lightning/restore/file_checkpoints.pb.go index a12a7284d..b3b38bd41 100644 --- a/lightning/restore/file_checkpoints.pb.go +++ b/lightning/restore/file_checkpoints.pb.go @@ -3,14 +3,14 @@ package restore -import proto "github.com/gogo/protobuf/proto" -import fmt "fmt" -import math "math" -import _ "github.com/gogo/protobuf/gogoproto" - -import encoding_binary "encoding/binary" - -import io "io" +import ( + encoding_binary "encoding/binary" + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" +) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -25,16 +25,14 @@ const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type CheckpointsModel struct { // key is table_name - Checkpoints map[string]*TableCheckpointModel `protobuf:"bytes,1,rep,name=checkpoints" json:"checkpoints,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_sizecache int32 `json:"-"` + Checkpoints map[string]*TableCheckpointModel `protobuf:"bytes,1,rep,name=checkpoints,proto3" json:"checkpoints,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (m *CheckpointsModel) Reset() { *m = CheckpointsModel{} } func (m *CheckpointsModel) String() string { return proto.CompactTextString(m) } func (*CheckpointsModel) ProtoMessage() {} func (*CheckpointsModel) Descriptor() ([]byte, []int) { - return fileDescriptor_file_checkpoints_168275cfec5db5bf, []int{0} + return fileDescriptor_c47ec4f2f281cd62, []int{0} } func (m *CheckpointsModel) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -51,8 +49,8 @@ func (m *CheckpointsModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, er return b[:n], nil } } -func (dst *CheckpointsModel) XXX_Merge(src proto.Message) { - xxx_messageInfo_CheckpointsModel.Merge(dst, src) +func (m *CheckpointsModel) XXX_Merge(src proto.Message) { + xxx_messageInfo_CheckpointsModel.Merge(m, src) } func (m *CheckpointsModel) XXX_Size() int { return m.Size() @@ -64,19 +62,17 @@ func (m *CheckpointsModel) XXX_DiscardUnknown() { var xxx_messageInfo_CheckpointsModel proto.InternalMessageInfo type TableCheckpointModel struct { - Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` - Status uint32 `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"` - AllocBase int64 `protobuf:"varint,4,opt,name=alloc_base,json=allocBase,proto3" json:"alloc_base,omitempty"` - Engines []*EngineCheckpointModel `protobuf:"bytes,6,rep,name=engines" json:"engines,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_sizecache int32 `json:"-"` + Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` + Status uint32 `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"` + AllocBase int64 `protobuf:"varint,4,opt,name=alloc_base,json=allocBase,proto3" json:"alloc_base,omitempty"` + Engines map[int32]*EngineCheckpointModel `protobuf:"bytes,8,rep,name=engines,proto3" json:"engines,omitempty" protobuf_key:"zigzag32,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (m *TableCheckpointModel) Reset() { *m = TableCheckpointModel{} } func (m *TableCheckpointModel) String() string { return proto.CompactTextString(m) } func (*TableCheckpointModel) ProtoMessage() {} func (*TableCheckpointModel) Descriptor() ([]byte, []int) { - return fileDescriptor_file_checkpoints_168275cfec5db5bf, []int{1} + return fileDescriptor_c47ec4f2f281cd62, []int{1} } func (m *TableCheckpointModel) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -93,8 +89,8 @@ func (m *TableCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte return b[:n], nil } } -func (dst *TableCheckpointModel) XXX_Merge(src proto.Message) { - xxx_messageInfo_TableCheckpointModel.Merge(dst, src) +func (m *TableCheckpointModel) XXX_Merge(src proto.Message) { + xxx_messageInfo_TableCheckpointModel.Merge(m, src) } func (m *TableCheckpointModel) XXX_Size() int { return m.Size() @@ -108,16 +104,14 @@ var xxx_messageInfo_TableCheckpointModel proto.InternalMessageInfo type EngineCheckpointModel struct { Status uint32 `protobuf:"varint,1,opt,name=status,proto3" json:"status,omitempty"` // key is "$path:$offset" - Chunks map[string]*ChunkCheckpointModel `protobuf:"bytes,2,rep,name=chunks" json:"chunks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_sizecache int32 `json:"-"` + Chunks map[string]*ChunkCheckpointModel `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (m *EngineCheckpointModel) Reset() { *m = EngineCheckpointModel{} } func (m *EngineCheckpointModel) String() string { return proto.CompactTextString(m) } func (*EngineCheckpointModel) ProtoMessage() {} func (*EngineCheckpointModel) Descriptor() ([]byte, []int) { - return fileDescriptor_file_checkpoints_168275cfec5db5bf, []int{2} + return fileDescriptor_c47ec4f2f281cd62, []int{2} } func (m *EngineCheckpointModel) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -134,8 +128,8 @@ func (m *EngineCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byt return b[:n], nil } } -func (dst *EngineCheckpointModel) XXX_Merge(src proto.Message) { - xxx_messageInfo_EngineCheckpointModel.Merge(dst, src) +func (m *EngineCheckpointModel) XXX_Merge(src proto.Message) { + xxx_messageInfo_EngineCheckpointModel.Merge(m, src) } func (m *EngineCheckpointModel) XXX_Size() int { return m.Size() @@ -147,26 +141,24 @@ func (m *EngineCheckpointModel) XXX_DiscardUnknown() { var xxx_messageInfo_EngineCheckpointModel proto.InternalMessageInfo type ChunkCheckpointModel struct { - Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` - Offset int64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"` - Columns []byte `protobuf:"bytes,3,opt,name=columns,proto3" json:"columns,omitempty"` - ShouldIncludeRowId bool `protobuf:"varint,4,opt,name=should_include_row_id,json=shouldIncludeRowId,proto3" json:"should_include_row_id,omitempty"` - EndOffset int64 `protobuf:"varint,5,opt,name=end_offset,json=endOffset,proto3" json:"end_offset,omitempty"` - Pos int64 `protobuf:"varint,6,opt,name=pos,proto3" json:"pos,omitempty"` - PrevRowidMax int64 `protobuf:"varint,7,opt,name=prev_rowid_max,json=prevRowidMax,proto3" json:"prev_rowid_max,omitempty"` - RowidMax int64 `protobuf:"varint,8,opt,name=rowid_max,json=rowidMax,proto3" json:"rowid_max,omitempty"` - KvcBytes uint64 `protobuf:"varint,9,opt,name=kvc_bytes,json=kvcBytes,proto3" json:"kvc_bytes,omitempty"` - KvcKvs uint64 `protobuf:"varint,10,opt,name=kvc_kvs,json=kvcKvs,proto3" json:"kvc_kvs,omitempty"` - KvcChecksum uint64 `protobuf:"fixed64,11,opt,name=kvc_checksum,json=kvcChecksum,proto3" json:"kvc_checksum,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_sizecache int32 `json:"-"` + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + Offset int64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"` + Columns []byte `protobuf:"bytes,3,opt,name=columns,proto3" json:"columns,omitempty"` + ShouldIncludeRowId bool `protobuf:"varint,4,opt,name=should_include_row_id,json=shouldIncludeRowId,proto3" json:"should_include_row_id,omitempty"` + EndOffset int64 `protobuf:"varint,5,opt,name=end_offset,json=endOffset,proto3" json:"end_offset,omitempty"` + Pos int64 `protobuf:"varint,6,opt,name=pos,proto3" json:"pos,omitempty"` + PrevRowidMax int64 `protobuf:"varint,7,opt,name=prev_rowid_max,json=prevRowidMax,proto3" json:"prev_rowid_max,omitempty"` + RowidMax int64 `protobuf:"varint,8,opt,name=rowid_max,json=rowidMax,proto3" json:"rowid_max,omitempty"` + KvcBytes uint64 `protobuf:"varint,9,opt,name=kvc_bytes,json=kvcBytes,proto3" json:"kvc_bytes,omitempty"` + KvcKvs uint64 `protobuf:"varint,10,opt,name=kvc_kvs,json=kvcKvs,proto3" json:"kvc_kvs,omitempty"` + KvcChecksum uint64 `protobuf:"fixed64,11,opt,name=kvc_checksum,json=kvcChecksum,proto3" json:"kvc_checksum,omitempty"` } func (m *ChunkCheckpointModel) Reset() { *m = ChunkCheckpointModel{} } func (m *ChunkCheckpointModel) String() string { return proto.CompactTextString(m) } func (*ChunkCheckpointModel) ProtoMessage() {} func (*ChunkCheckpointModel) Descriptor() ([]byte, []int) { - return fileDescriptor_file_checkpoints_168275cfec5db5bf, []int{3} + return fileDescriptor_c47ec4f2f281cd62, []int{3} } func (m *ChunkCheckpointModel) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -183,8 +175,8 @@ func (m *ChunkCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte return b[:n], nil } } -func (dst *ChunkCheckpointModel) XXX_Merge(src proto.Message) { - xxx_messageInfo_ChunkCheckpointModel.Merge(dst, src) +func (m *ChunkCheckpointModel) XXX_Merge(src proto.Message) { + xxx_messageInfo_ChunkCheckpointModel.Merge(m, src) } func (m *ChunkCheckpointModel) XXX_Size() int { return m.Size() @@ -199,10 +191,57 @@ func init() { proto.RegisterType((*CheckpointsModel)(nil), "CheckpointsModel") proto.RegisterMapType((map[string]*TableCheckpointModel)(nil), "CheckpointsModel.CheckpointsEntry") proto.RegisterType((*TableCheckpointModel)(nil), "TableCheckpointModel") + proto.RegisterMapType((map[int32]*EngineCheckpointModel)(nil), "TableCheckpointModel.EnginesEntry") proto.RegisterType((*EngineCheckpointModel)(nil), "EngineCheckpointModel") proto.RegisterMapType((map[string]*ChunkCheckpointModel)(nil), "EngineCheckpointModel.ChunksEntry") proto.RegisterType((*ChunkCheckpointModel)(nil), "ChunkCheckpointModel") } + +func init() { + proto.RegisterFile("lightning/restore/file_checkpoints.proto", fileDescriptor_c47ec4f2f281cd62) +} + +var fileDescriptor_c47ec4f2f281cd62 = []byte{ + // 579 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x54, 0xcd, 0x6e, 0xd3, 0x40, + 0x10, 0xce, 0xc6, 0xad, 0x93, 0x8c, 0x03, 0x2a, 0xab, 0xb6, 0xac, 0x82, 0xb0, 0x4c, 0x05, 0x92, + 0x25, 0xc0, 0x11, 0xe5, 0x82, 0x2a, 0x4e, 0x0d, 0x3d, 0x54, 0xa8, 0x02, 0xad, 0xe0, 0xc2, 0xc5, + 0xf2, 0xcf, 0xc6, 0xb6, 0xec, 0x78, 0x23, 0xaf, 0xed, 0x36, 0x6f, 0xc1, 0x9b, 0xf0, 0x04, 0xdc, + 0x7b, 0xec, 0x91, 0x03, 0x07, 0x48, 0x1e, 0x81, 0x17, 0x40, 0x5e, 0xbb, 0x8a, 0x83, 0xa2, 0x8a, + 0xdb, 0xcc, 0xf7, 0x7d, 0xf3, 0x4d, 0x3e, 0x8d, 0x37, 0x60, 0x26, 0x51, 0x10, 0xe6, 0x69, 0x94, + 0x06, 0xe3, 0x8c, 0x89, 0x9c, 0x67, 0x6c, 0x3c, 0x8d, 0x12, 0x66, 0x7b, 0x21, 0xf3, 0xe2, 0x39, + 0x8f, 0xd2, 0x5c, 0x58, 0xf3, 0x8c, 0xe7, 0x7c, 0xf4, 0x32, 0x88, 0xf2, 0xb0, 0x70, 0x2d, 0x8f, + 0xcf, 0xc6, 0x01, 0x0f, 0xf8, 0x58, 0xc2, 0x6e, 0x31, 0x95, 0x9d, 0x6c, 0x64, 0x55, 0xcb, 0x8f, + 0xbe, 0x21, 0xd8, 0x9b, 0xac, 0x4d, 0x2e, 0xb8, 0xcf, 0x12, 0xfc, 0x0e, 0xb4, 0x96, 0x31, 0x41, + 0x86, 0x62, 0x6a, 0xc7, 0x47, 0xd6, 0xbf, 0xba, 0x36, 0x70, 0x96, 0xe6, 0xd9, 0x82, 0xb6, 0xc7, + 0x46, 0x9f, 0x37, 0x9c, 0xa5, 0x00, 0xef, 0x81, 0x12, 0xb3, 0x05, 0x41, 0x06, 0x32, 0x07, 0xb4, + 0x2a, 0xf1, 0x73, 0xd8, 0x2d, 0x9d, 0xa4, 0x60, 0xa4, 0x6b, 0x20, 0x53, 0x3b, 0x3e, 0xb0, 0x3e, + 0x39, 0x6e, 0xc2, 0xd6, 0x83, 0x72, 0x13, 0xad, 0x35, 0x27, 0xdd, 0x37, 0xe8, 0xe8, 0x0f, 0x82, + 0xfd, 0x6d, 0x1a, 0x8c, 0x61, 0x27, 0x74, 0x44, 0x28, 0xcd, 0x87, 0x54, 0xd6, 0xf8, 0x10, 0x54, + 0x91, 0x3b, 0x79, 0x21, 0x88, 0x62, 0x20, 0xf3, 0x1e, 0x6d, 0x3a, 0xfc, 0x18, 0xc0, 0x49, 0x12, + 0xee, 0xd9, 0xae, 0x23, 0x18, 0xd9, 0x31, 0x90, 0xa9, 0xd0, 0x81, 0x44, 0x4e, 0x1d, 0xc1, 0xf0, + 0x5b, 0xe8, 0xb1, 0x34, 0x88, 0x52, 0x26, 0x48, 0xbf, 0x09, 0xbf, 0x6d, 0xa5, 0x75, 0x56, 0x8b, + 0xea, 0xf0, 0xb7, 0x23, 0x23, 0x0a, 0xc3, 0x36, 0xd1, 0x0e, 0xfd, 0xa0, 0x0e, 0xfd, 0x62, 0x33, + 0xf4, 0x61, 0x63, 0x74, 0x47, 0xea, 0xef, 0x08, 0x0e, 0xb6, 0x8a, 0x5a, 0x11, 0xd1, 0x46, 0xc4, + 0x13, 0x50, 0xbd, 0xb0, 0x48, 0x63, 0x41, 0xba, 0x4d, 0x84, 0xad, 0xf3, 0xd6, 0x44, 0x8a, 0xea, + 0x08, 0xcd, 0xc4, 0xe8, 0x23, 0x68, 0x2d, 0xf8, 0x7f, 0xae, 0x26, 0xe5, 0x77, 0xfc, 0xfe, 0x9f, + 0x5d, 0xd8, 0xdf, 0xa6, 0xa9, 0xae, 0x36, 0x77, 0xf2, 0xb0, 0x31, 0x97, 0x75, 0x15, 0x89, 0x4f, + 0xa7, 0x82, 0xe5, 0xd2, 0x5e, 0xa1, 0x4d, 0x87, 0x09, 0xf4, 0x3c, 0x9e, 0x14, 0xb3, 0xb4, 0x3e, + 0xe7, 0x90, 0xde, 0xb6, 0xf8, 0x15, 0x1c, 0x88, 0x90, 0x17, 0x89, 0x6f, 0x47, 0xa9, 0x97, 0x14, + 0x3e, 0xb3, 0x33, 0x7e, 0x69, 0x47, 0xbe, 0x3c, 0x6d, 0x9f, 0xe2, 0x9a, 0x3c, 0xaf, 0x39, 0xca, + 0x2f, 0xcf, 0xfd, 0xea, 0x13, 0x60, 0xa9, 0x6f, 0x37, 0x8b, 0x76, 0xeb, 0x4f, 0x80, 0xa5, 0xfe, + 0x87, 0x7a, 0xd7, 0x1e, 0x28, 0x73, 0x2e, 0x88, 0x2a, 0xf1, 0xaa, 0xc4, 0x4f, 0xe1, 0xfe, 0x3c, + 0x63, 0x65, 0xe5, 0x1c, 0xf9, 0xf6, 0xcc, 0xb9, 0x22, 0x3d, 0x49, 0x0e, 0x2b, 0x94, 0x56, 0xe0, + 0x85, 0x73, 0x85, 0x1f, 0xc1, 0x60, 0x2d, 0xe8, 0x4b, 0x41, 0x3f, 0x6b, 0x91, 0x71, 0xe9, 0xd9, + 0xee, 0x22, 0x67, 0x82, 0x0c, 0x0c, 0x64, 0xee, 0xd0, 0x7e, 0x5c, 0x7a, 0xa7, 0x55, 0x8f, 0x1f, + 0x42, 0xaf, 0x22, 0xe3, 0x52, 0x10, 0x90, 0x94, 0x1a, 0x97, 0xde, 0xfb, 0x52, 0xe0, 0x27, 0x30, + 0xac, 0x08, 0xf9, 0xb6, 0x44, 0x31, 0x23, 0x9a, 0x81, 0x4c, 0x95, 0x6a, 0x71, 0xe9, 0x4d, 0x1a, + 0xe8, 0xf4, 0xd9, 0xf5, 0x6f, 0xbd, 0x73, 0xbd, 0xd4, 0xd1, 0xcd, 0x52, 0x47, 0xbf, 0x96, 0x3a, + 0xfa, 0xba, 0xd2, 0x3b, 0x37, 0x2b, 0xbd, 0xf3, 0x63, 0xa5, 0x77, 0xbe, 0xf4, 0x9a, 0xff, 0x0c, + 0x57, 0x95, 0x8f, 0xfe, 0xf5, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x90, 0x6a, 0x0b, 0x4f, + 0x04, 0x00, 0x00, +} + func (m *CheckpointsModel) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -281,15 +320,30 @@ func (m *TableCheckpointModel) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintFileCheckpoints(dAtA, i, uint64(m.AllocBase)) } if len(m.Engines) > 0 { - for _, msg := range m.Engines { - dAtA[i] = 0x32 + for k, _ := range m.Engines { + dAtA[i] = 0x42 i++ - i = encodeVarintFileCheckpoints(dAtA, i, uint64(msg.Size())) - n, err := msg.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err + v := m.Engines[k] + msgSize := 0 + if v != nil { + msgSize = v.Size() + msgSize += 1 + sovFileCheckpoints(uint64(msgSize)) + } + mapSize := 1 + sozFileCheckpoints(uint64(k)) + msgSize + i = encodeVarintFileCheckpoints(dAtA, i, uint64(mapSize)) + dAtA[i] = 0x8 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64((uint32(k)<<1)^uint32((k>>31)))) + if v != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintFileCheckpoints(dAtA, i, uint64(v.Size())) + n2, err := v.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 } - i += n } } return i, nil @@ -335,11 +389,11 @@ func (m *EngineCheckpointModel) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintFileCheckpoints(dAtA, i, uint64(v.Size())) - n2, err := v.MarshalTo(dAtA[i:]) + n3, err := v.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n2 + i += n3 } } } @@ -437,6 +491,9 @@ func encodeVarintFileCheckpoints(dAtA []byte, offset int, v uint64) int { return offset + 1 } func (m *CheckpointsModel) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Checkpoints) > 0 { @@ -456,6 +513,9 @@ func (m *CheckpointsModel) Size() (n int) { } func (m *TableCheckpointModel) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l l = len(m.Hash) @@ -469,15 +529,25 @@ func (m *TableCheckpointModel) Size() (n int) { n += 1 + sovFileCheckpoints(uint64(m.AllocBase)) } if len(m.Engines) > 0 { - for _, e := range m.Engines { - l = e.Size() - n += 1 + l + sovFileCheckpoints(uint64(l)) + for k, v := range m.Engines { + _ = k + _ = v + l = 0 + if v != nil { + l = v.Size() + l += 1 + sovFileCheckpoints(uint64(l)) + } + mapEntrySize := 1 + sozFileCheckpoints(uint64(k)) + l + n += mapEntrySize + 1 + sovFileCheckpoints(uint64(mapEntrySize)) } } return n } func (m *EngineCheckpointModel) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Status != 0 { @@ -500,6 +570,9 @@ func (m *EngineCheckpointModel) Size() (n int) { } func (m *ChunkCheckpointModel) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l l = len(m.Path) @@ -568,7 +641,7 @@ func (m *CheckpointsModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -596,7 +669,7 @@ func (m *CheckpointsModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -605,6 +678,9 @@ func (m *CheckpointsModel) Unmarshal(dAtA []byte) error { return ErrInvalidLengthFileCheckpoints } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthFileCheckpoints + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -625,7 +701,7 @@ func (m *CheckpointsModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -642,7 +718,7 @@ func (m *CheckpointsModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLenmapkey |= (uint64(b) & 0x7F) << shift + stringLenmapkey |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -652,6 +728,9 @@ func (m *CheckpointsModel) Unmarshal(dAtA []byte) error { return ErrInvalidLengthFileCheckpoints } postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey < 0 { + return ErrInvalidLengthFileCheckpoints + } if postStringIndexmapkey > l { return io.ErrUnexpectedEOF } @@ -668,7 +747,7 @@ func (m *CheckpointsModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - mapmsglen |= (int(b) & 0x7F) << shift + mapmsglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -677,7 +756,7 @@ func (m *CheckpointsModel) Unmarshal(dAtA []byte) error { return ErrInvalidLengthFileCheckpoints } postmsgIndex := iNdEx + mapmsglen - if mapmsglen < 0 { + if postmsgIndex < 0 { return ErrInvalidLengthFileCheckpoints } if postmsgIndex > l { @@ -714,6 +793,9 @@ func (m *CheckpointsModel) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthFileCheckpoints } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthFileCheckpoints + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -741,7 +823,7 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -769,7 +851,7 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - byteLen |= (int(b) & 0x7F) << shift + byteLen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -778,6 +860,9 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { return ErrInvalidLengthFileCheckpoints } postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthFileCheckpoints + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -800,7 +885,7 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Status |= (uint32(b) & 0x7F) << shift + m.Status |= uint32(b&0x7F) << shift if b < 0x80 { break } @@ -819,12 +904,12 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.AllocBase |= (int64(b) & 0x7F) << shift + m.AllocBase |= int64(b&0x7F) << shift if b < 0x80 { break } } - case 6: + case 8: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Engines", wireType) } @@ -838,7 +923,7 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -847,13 +932,100 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { return ErrInvalidLengthFileCheckpoints } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthFileCheckpoints + } if postIndex > l { return io.ErrUnexpectedEOF } - m.Engines = append(m.Engines, &EngineCheckpointModel{}) - if err := m.Engines[len(m.Engines)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err + if m.Engines == nil { + m.Engines = make(map[int32]*EngineCheckpointModel) } + var mapkey int32 + var mapvalue *EngineCheckpointModel + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var mapkeytemp int32 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapkeytemp |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + mapkeytemp = int32((uint32(mapkeytemp) >> 1) ^ uint32(((mapkeytemp&1)<<31)>>31)) + mapkey = int32(mapkeytemp) + } else if fieldNum == 2 { + var mapmsglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFileCheckpoints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapmsglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if mapmsglen < 0 { + return ErrInvalidLengthFileCheckpoints + } + postmsgIndex := iNdEx + mapmsglen + if postmsgIndex < 0 { + return ErrInvalidLengthFileCheckpoints + } + if postmsgIndex > l { + return io.ErrUnexpectedEOF + } + mapvalue = &EngineCheckpointModel{} + if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil { + return err + } + iNdEx = postmsgIndex + } else { + iNdEx = entryPreIndex + skippy, err := skipFileCheckpoints(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthFileCheckpoints + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.Engines[mapkey] = mapvalue iNdEx = postIndex default: iNdEx = preIndex @@ -864,6 +1036,9 @@ func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthFileCheckpoints } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthFileCheckpoints + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -891,7 +1066,7 @@ func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -919,7 +1094,7 @@ func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Status |= (uint32(b) & 0x7F) << shift + m.Status |= uint32(b&0x7F) << shift if b < 0x80 { break } @@ -938,7 +1113,7 @@ func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + msglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -947,6 +1122,9 @@ func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error { return ErrInvalidLengthFileCheckpoints } postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthFileCheckpoints + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -967,7 +1145,7 @@ func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -984,7 +1162,7 @@ func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLenmapkey |= (uint64(b) & 0x7F) << shift + stringLenmapkey |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -994,6 +1172,9 @@ func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error { return ErrInvalidLengthFileCheckpoints } postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey < 0 { + return ErrInvalidLengthFileCheckpoints + } if postStringIndexmapkey > l { return io.ErrUnexpectedEOF } @@ -1010,7 +1191,7 @@ func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - mapmsglen |= (int(b) & 0x7F) << shift + mapmsglen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1019,7 +1200,7 @@ func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error { return ErrInvalidLengthFileCheckpoints } postmsgIndex := iNdEx + mapmsglen - if mapmsglen < 0 { + if postmsgIndex < 0 { return ErrInvalidLengthFileCheckpoints } if postmsgIndex > l { @@ -1056,6 +1237,9 @@ func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthFileCheckpoints } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthFileCheckpoints + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -1083,7 +1267,7 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - wire |= (uint64(b) & 0x7F) << shift + wire |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1111,7 +1295,7 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift + stringLen |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1121,6 +1305,9 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { return ErrInvalidLengthFileCheckpoints } postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthFileCheckpoints + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1140,7 +1327,7 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Offset |= (int64(b) & 0x7F) << shift + m.Offset |= int64(b&0x7F) << shift if b < 0x80 { break } @@ -1159,7 +1346,7 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - byteLen |= (int(b) & 0x7F) << shift + byteLen |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1168,6 +1355,9 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { return ErrInvalidLengthFileCheckpoints } postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthFileCheckpoints + } if postIndex > l { return io.ErrUnexpectedEOF } @@ -1190,7 +1380,7 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - v |= (int(b) & 0x7F) << shift + v |= int(b&0x7F) << shift if b < 0x80 { break } @@ -1210,7 +1400,7 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.EndOffset |= (int64(b) & 0x7F) << shift + m.EndOffset |= int64(b&0x7F) << shift if b < 0x80 { break } @@ -1229,7 +1419,7 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Pos |= (int64(b) & 0x7F) << shift + m.Pos |= int64(b&0x7F) << shift if b < 0x80 { break } @@ -1248,7 +1438,7 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.PrevRowidMax |= (int64(b) & 0x7F) << shift + m.PrevRowidMax |= int64(b&0x7F) << shift if b < 0x80 { break } @@ -1267,7 +1457,7 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.RowidMax |= (int64(b) & 0x7F) << shift + m.RowidMax |= int64(b&0x7F) << shift if b < 0x80 { break } @@ -1286,7 +1476,7 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.KvcBytes |= (uint64(b) & 0x7F) << shift + m.KvcBytes |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1305,7 +1495,7 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.KvcKvs |= (uint64(b) & 0x7F) << shift + m.KvcKvs |= uint64(b&0x7F) << shift if b < 0x80 { break } @@ -1329,6 +1519,9 @@ func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error { if skippy < 0 { return ErrInvalidLengthFileCheckpoints } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthFileCheckpoints + } if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } @@ -1395,10 +1588,13 @@ func skipFileCheckpoints(dAtA []byte) (n int, err error) { break } } - iNdEx += length if length < 0 { return 0, ErrInvalidLengthFileCheckpoints } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthFileCheckpoints + } return iNdEx, nil case 3: for { @@ -1427,6 +1623,9 @@ func skipFileCheckpoints(dAtA []byte) (n int, err error) { return 0, err } iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthFileCheckpoints + } } return iNdEx, nil case 4: @@ -1445,46 +1644,3 @@ var ( ErrInvalidLengthFileCheckpoints = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowFileCheckpoints = fmt.Errorf("proto: integer overflow") ) - -func init() { - proto.RegisterFile("lightning/restore/file_checkpoints.proto", fileDescriptor_file_checkpoints_168275cfec5db5bf) -} - -var fileDescriptor_file_checkpoints_168275cfec5db5bf = []byte{ - // 550 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcf, 0x6e, 0xd3, 0x3e, - 0x1c, 0x9f, 0x9b, 0x2d, 0x6d, 0xbf, 0xe9, 0xef, 0xa7, 0xca, 0x5a, 0x87, 0x55, 0xb4, 0x2a, 0x54, - 0x1c, 0x22, 0x21, 0x52, 0x18, 0x17, 0xb4, 0x63, 0xcb, 0x0e, 0x13, 0x9a, 0x40, 0x16, 0x5c, 0xb8, - 0x44, 0xf9, 0xe3, 0x26, 0x51, 0xd2, 0xb8, 0x8a, 0x93, 0x6c, 0x7d, 0x0b, 0x24, 0x1e, 0x84, 0x27, - 0xe0, 0xbe, 0x23, 0x0f, 0xc0, 0x01, 0xca, 0x8b, 0x20, 0x3b, 0x99, 0x9a, 0x4d, 0x15, 0xe2, 0xf6, - 0xfd, 0xfc, 0xf1, 0xc7, 0xce, 0xc7, 0x31, 0x58, 0x69, 0x1c, 0x46, 0x45, 0x16, 0x67, 0xe1, 0x2c, - 0x67, 0xa2, 0xe0, 0x39, 0x9b, 0x2d, 0xe3, 0x94, 0x39, 0x7e, 0xc4, 0xfc, 0x64, 0xcd, 0xe3, 0xac, - 0x10, 0xf6, 0x3a, 0xe7, 0x05, 0x1f, 0x3f, 0x0f, 0xe3, 0x22, 0x2a, 0x3d, 0xdb, 0xe7, 0xab, 0x59, - 0xc8, 0x43, 0x3e, 0x53, 0xb4, 0x57, 0x2e, 0x15, 0x52, 0x40, 0x4d, 0xb5, 0x7d, 0xfa, 0x15, 0xc1, - 0x70, 0xb1, 0x0b, 0xb9, 0xe2, 0x01, 0x4b, 0xf1, 0x1b, 0x30, 0x5a, 0xc1, 0x04, 0x99, 0x9a, 0x65, - 0x9c, 0x4d, 0xed, 0x87, 0xbe, 0x36, 0x71, 0x91, 0x15, 0xf9, 0x86, 0xb6, 0x97, 0x8d, 0x3f, 0xde, - 0x4b, 0x56, 0x06, 0x3c, 0x04, 0x2d, 0x61, 0x1b, 0x82, 0x4c, 0x64, 0xf5, 0xa9, 0x1c, 0xf1, 0x33, - 0x38, 0xaa, 0xdc, 0xb4, 0x64, 0xa4, 0x63, 0x22, 0xcb, 0x38, 0x1b, 0xd9, 0x1f, 0x5c, 0x2f, 0x65, - 0xbb, 0x85, 0x6a, 0x27, 0x5a, 0x7b, 0xce, 0x3b, 0xaf, 0xd1, 0xf4, 0x0b, 0x82, 0xe3, 0x7d, 0x1e, - 0x8c, 0xe1, 0x30, 0x72, 0x45, 0xa4, 0xc2, 0x07, 0x54, 0xcd, 0xf8, 0x04, 0x74, 0x51, 0xb8, 0x45, - 0x29, 0x88, 0x66, 0x22, 0xeb, 0x3f, 0xda, 0x20, 0x7c, 0x0a, 0xe0, 0xa6, 0x29, 0xf7, 0x1d, 0xcf, - 0x15, 0x8c, 0x1c, 0x9a, 0xc8, 0xd2, 0x68, 0x5f, 0x31, 0x73, 0x57, 0x30, 0xfc, 0x02, 0xba, 0x2c, - 0x0b, 0xe3, 0x8c, 0x09, 0xa2, 0xab, 0x8f, 0x3f, 0xb1, 0x2f, 0x14, 0x7e, 0x78, 0xae, 0x3b, 0xdb, - 0xf4, 0x1b, 0x82, 0xd1, 0x5e, 0x4b, 0xeb, 0x08, 0xe8, 0xde, 0x11, 0xce, 0x41, 0xf7, 0xa3, 0x32, - 0x4b, 0x04, 0xe9, 0x34, 0xfd, 0xee, 0x5d, 0x6f, 0x2f, 0x94, 0xa9, 0xee, 0xb7, 0x59, 0x31, 0x7e, - 0x0f, 0x46, 0x8b, 0xfe, 0x97, 0x56, 0x95, 0xfd, 0x2f, 0xad, 0xfe, 0xe8, 0xc0, 0xf1, 0x3e, 0x8f, - 0x6c, 0x75, 0xed, 0x16, 0x51, 0x13, 0xae, 0x66, 0xf9, 0x49, 0x7c, 0xb9, 0x14, 0xac, 0x50, 0xf1, - 0x1a, 0x6d, 0x10, 0x26, 0xd0, 0xf5, 0x79, 0x5a, 0xae, 0xb2, 0xba, 0xee, 0x01, 0xbd, 0x83, 0xf8, - 0x25, 0x8c, 0x44, 0xc4, 0xcb, 0x34, 0x70, 0xe2, 0xcc, 0x4f, 0xcb, 0x80, 0x39, 0x39, 0xbf, 0x76, - 0xe2, 0x40, 0x55, 0xdf, 0xa3, 0xb8, 0x16, 0x2f, 0x6b, 0x8d, 0xf2, 0xeb, 0xcb, 0x40, 0x5e, 0x11, - 0xcb, 0x02, 0xa7, 0xd9, 0xe8, 0xa8, 0xbe, 0x22, 0x96, 0x05, 0xef, 0xea, 0xbd, 0x86, 0xa0, 0xad, - 0xb9, 0xbc, 0x1e, 0xc9, 0xcb, 0x11, 0x3f, 0x85, 0xff, 0xd7, 0x39, 0xab, 0x64, 0x72, 0x1c, 0x38, - 0x2b, 0xf7, 0x86, 0x74, 0x95, 0x38, 0x90, 0x2c, 0x95, 0xe4, 0x95, 0x7b, 0x83, 0x1f, 0x43, 0x7f, - 0x67, 0xe8, 0x29, 0x43, 0x2f, 0x6f, 0x89, 0x49, 0xe5, 0x3b, 0xde, 0xa6, 0x60, 0x82, 0xf4, 0x4d, - 0x64, 0x1d, 0xd2, 0x5e, 0x52, 0xf9, 0x73, 0x89, 0xf1, 0x23, 0xe8, 0x4a, 0x31, 0xa9, 0x04, 0x01, - 0x25, 0xe9, 0x49, 0xe5, 0xbf, 0xad, 0x04, 0x7e, 0x02, 0x03, 0x29, 0xa8, 0x7f, 0x5f, 0x94, 0x2b, - 0x62, 0x98, 0xc8, 0xd2, 0xa9, 0x91, 0x54, 0xfe, 0xa2, 0xa1, 0xe6, 0xa7, 0xb7, 0xbf, 0x26, 0x07, - 0xb7, 0xdb, 0x09, 0xfa, 0xbe, 0x9d, 0xa0, 0x9f, 0xdb, 0x09, 0xfa, 0xfc, 0x7b, 0x72, 0xf0, 0xa9, - 0xdb, 0xbc, 0x65, 0x4f, 0x57, 0x8f, 0xf1, 0xd5, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x5b, 0x0d, - 0x43, 0xc6, 0xe7, 0x03, 0x00, 0x00, -} diff --git a/lightning/restore/file_checkpoints.proto b/lightning/restore/file_checkpoints.proto index c15e7689a..3980e5ac0 100644 --- a/lightning/restore/file_checkpoints.proto +++ b/lightning/restore/file_checkpoints.proto @@ -27,7 +27,7 @@ message TableCheckpointModel { bytes hash = 1; uint32 status = 3; int64 alloc_base = 4; - repeated EngineCheckpointModel engines = 6; + map engines = 8; } message EngineCheckpointModel { diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index e203e6042..b43f62946 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -19,6 +19,7 @@ import ( "database/sql" "fmt" "io" + "math" "net/http" "os" "regexp" @@ -29,8 +30,14 @@ import ( "github.com/coreos/go-semver/semver" "github.com/cznic/mathutil" + "github.com/pingcap/errors" sstpb "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/parser/model" + tidbcfg "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/kvencoder" + "github.com/pingcap/tidb-lightning/lightning/common" "github.com/pingcap/tidb-lightning/lightning/config" "github.com/pingcap/tidb-lightning/lightning/kv" @@ -38,11 +45,6 @@ import ( "github.com/pingcap/tidb-lightning/lightning/mydump" verify "github.com/pingcap/tidb-lightning/lightning/verification" "github.com/pingcap/tidb-lightning/lightning/worker" - - "github.com/pingcap/errors" - tidbcfg "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/util/kvencoder" ) const ( @@ -54,6 +56,11 @@ const ( defaultGCLifeTime = 100 * time.Hour ) +const ( + indexEngineID = -1 + wholeTableEngineID = math.MaxInt32 +) + const ( compactStateIdle int32 = iota compactStateDoing @@ -110,6 +117,7 @@ type RestoreController struct { dbMetas []*mydump.MDDatabaseMeta dbInfos map[string]*TidbDBInfo tableWorkers *worker.Pool + indexWorkers *worker.Pool regionWorkers *worker.Pool ioWorkers *worker.Pool importer *kv.Importer @@ -147,6 +155,7 @@ func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta, cfg: cfg, dbMetas: dbMetas, tableWorkers: worker.NewPool(ctx, cfg.App.TableConcurrency, "table"), + indexWorkers: worker.NewPool(ctx, cfg.App.IndexConcurrency, "index"), regionWorkers: worker.NewPool(ctx, cfg.App.RegionConcurrency, "region"), ioWorkers: worker.NewPool(ctx, cfg.App.IOConcurrency, "io"), importer: importer, @@ -285,7 +294,7 @@ func (rc *RestoreController) estimateChunkCountIntoMetrics() { metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated).Add(float64(estimatedChunkCount)) } -func (rc *RestoreController) saveStatusCheckpoint(tableName string, engineID int, err error, statusIfSucceed CheckpointStatus) { +func (rc *RestoreController) saveStatusCheckpoint(tableName string, engineID int32, err error, statusIfSucceed CheckpointStatus) { merger := &StatusCheckpointMerger{Status: statusIfSucceed, EngineID: engineID} switch { @@ -344,14 +353,27 @@ func (rc *RestoreController) listenCheckpointUpdates(wg *sync.WaitGroup) { // wg.Wait() // panic("forcing failure due to FailIfImportedChunk") // } - // continue + // goto RETURN1 + + // gofail: RETURN1: // gofail: var FailIfStatusBecomes int // if merger, ok := scp.merger.(*StatusCheckpointMerger); ok && merger.EngineID >= 0 && int(merger.Status) == FailIfStatusBecomes { // wg.Wait() // panic("forcing failure due to FailIfStatusBecomes") // } - // continue + // goto RETURN2 + + // gofail: RETURN2: + + // gofail: var FailIfIndexEngineImported int + // if merger, ok := scp.merger.(*StatusCheckpointMerger); ok && merger.EngineID == wholeTableEngineID && merger.Status == CheckpointStatusIndexImported && FailIfIndexEngineImported > 0 { + // wg.Wait() + // panic("forcing failure due to FailIfIndexEngineImported") + // } + // goto RETURN3 + + // gofail: RETURN3: } } @@ -501,10 +523,38 @@ func (t *TableRestore) restoreTable( } // 2. Restore engines (if still needed) + indexEngineCp := cp.Engines[indexEngineID] + if indexEngineCp == nil { + return errors.Errorf("table %v index engine checkpoint not found", t.tableName) + } + + // The table checkpoint status set to `CheckpointStatusIndexImported` only if + // both all data engines and the index engine had been imported to TiKV. + // But persist index engine checkpoint status and table checkpoint status are + // not an atomic operation, so `cp.Status < CheckpointStatusIndexImported` + // but `indexEngineCp.Status == CheckpointStatusImported` could happen + // when kill lightning after saving index engine checkpoint status before saving + // table checkpoint status. + var indexEngine *kv.OpenedEngine + var closedIndexEngine *kv.ClosedEngine + if indexEngineCp.Status < CheckpointStatusImported && cp.Status < CheckpointStatusIndexImported { + indexWorker := rc.indexWorkers.Apply() + defer rc.indexWorkers.Recycle(indexWorker) + var err error + indexEngine, err = rc.importer.OpenEngine(ctx, t.tableName, indexEngineID) + if err != nil { + return errors.Trace(err) + } - if cp.Status < CheckpointStatusImported { - timer := time.Now() + // The table checkpoint status less than `CheckpointStatusIndexImported` implies + // that index engine checkpoint status less than `CheckpointStatusImported`. + // So the index engine must be found in above process + if indexEngine == nil { + return errors.Errorf("table checkpoint status %v incompitable with index engine checkpoint status %v", + cp.Status, indexEngineCp.Status) + } + timer := time.Now() var wg sync.WaitGroup var engineErr common.OnceError @@ -518,6 +568,11 @@ func (t *TableRestore) restoreTable( break } + // Should skip index engine + if engineID < 0 { + continue + } + wg.Add(1) // Note: We still need tableWorkers to control the concurrency of tables. @@ -525,19 +580,19 @@ func (t *TableRestore) restoreTable( // the difference between restoring tables concurrently and restoring tables one by one. restoreWorker := rc.tableWorkers.Apply() - go func(w *worker.Worker, eid int, ecp *EngineCheckpoint) { + go func(w *worker.Worker, eid int32, ecp *EngineCheckpoint) { defer wg.Done() tag := fmt.Sprintf("%s:%d", t.tableName, eid) - closedEngine, closedEngineWorker, err := t.restoreEngine(ctx, rc, eid, ecp) + dataClosedEngine, dataWorker, err := t.restoreEngine(ctx, rc, indexEngine, eid, ecp) rc.tableWorkers.Recycle(w) if err != nil { engineErr.Set(tag, err) return } - defer rc.closedEngineLimit.Recycle(closedEngineWorker) - if err := t.importEngine(ctx, closedEngine, rc, eid, ecp); err != nil { + defer rc.closedEngineLimit.Recycle(dataWorker) + if err := t.importEngine(ctx, dataClosedEngine, rc, eid, ecp); err != nil { engineErr.Set(tag, err) } }(restoreWorker, engineID, engine) @@ -546,22 +601,34 @@ func (t *TableRestore) restoreTable( wg.Wait() common.AppLogger.Infof("[%s] import whole table takes %v", t.tableName, time.Since(timer)) - err := engineErr.Get() - rc.saveStatusCheckpoint(t.tableName, -1, err, CheckpointStatusImported) + err = engineErr.Get() + if err != nil { + return errors.Trace(err) + } + + // If index engine file has been closed but not imported only if context cancel occurred + // when `importKV()` execution, so `UnsafeCloseEngine` and continue import it. + if indexEngineCp.Status == CheckpointStatusClosed { + closedIndexEngine, err = rc.importer.UnsafeCloseEngine(ctx, t.tableName, indexEngineID) + } else { + closedIndexEngine, err = indexEngine.Close(ctx) + rc.saveStatusCheckpoint(t.tableName, indexEngineID, err, CheckpointStatusClosed) + } if err != nil { + common.AppLogger.Errorf("[%s] [kv-deliver] index engine closed error: %s", t.tableName, errors.ErrorStack(err)) return errors.Trace(err) } } // 3. Post-process - - return errors.Trace(t.postProcess(ctx, rc, cp)) + return errors.Trace(t.postProcess(ctx, rc, cp, indexEngineCp, closedIndexEngine)) } func (t *TableRestore) restoreEngine( ctx context.Context, rc *RestoreController, - engineID int, + indexEngine *kv.OpenedEngine, + engineID int32, cp *EngineCheckpoint, ) (*kv.ClosedEngine, *worker.Worker, error) { if cp.Status >= CheckpointStatusClosed { @@ -577,7 +644,7 @@ func (t *TableRestore) restoreEngine( timer := time.Now() - engine, err := rc.importer.OpenEngine(ctx, t.tableName, engineID) + dataEngine, err := rc.importer.OpenEngine(ctx, t.tableName, engineID) if err != nil { return nil, nil, errors.Trace(err) } @@ -623,7 +690,7 @@ func (t *TableRestore) restoreEngine( rc.regionWorkers.Recycle(w) }() metric.ChunkCounter.WithLabelValues(metric.ChunkStateRunning).Inc() - err := cr.restore(ctx, t, engineID, engine, rc) + err := cr.restore(ctx, t, engineID, dataEngine, indexEngine, rc) if err == nil { metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished).Inc() return @@ -652,23 +719,23 @@ func (t *TableRestore) restoreEngine( return nil, nil, errors.Trace(err) } - w := rc.closedEngineLimit.Apply() - closedEngine, err := engine.Close(ctx) + dataWorker := rc.closedEngineLimit.Apply() + closedDataEngine, err := dataEngine.Close(ctx) rc.saveStatusCheckpoint(t.tableName, engineID, err, CheckpointStatusClosed) if err != nil { common.AppLogger.Errorf("[kv-deliver] flush stage with error (step = close) : %s", errors.ErrorStack(err)) // If any error occurred, recycle worker immediately - rc.closedEngineLimit.Recycle(w) + rc.closedEngineLimit.Recycle(dataWorker) return nil, nil, errors.Trace(err) } - return closedEngine, w, nil + return closedDataEngine, dataWorker, nil } func (t *TableRestore) importEngine( ctx context.Context, closedEngine *kv.ClosedEngine, rc *RestoreController, - engineID int, + engineID int32, cp *EngineCheckpoint, ) error { if cp.Status >= CheckpointStatusImported { @@ -689,7 +756,7 @@ func (t *TableRestore) importEngine( } // 2. perform a level-1 compact if idling. - if *rc.cfg.PostRestore.Level1Compact && + if rc.cfg.PostRestore.Level1Compact && atomic.CompareAndSwapInt32(&rc.compactState, compactStateIdle, compactStateDoing) { go func() { err := rc.doCompact(ctx, Level1Compact) @@ -704,7 +771,28 @@ func (t *TableRestore) importEngine( return nil } -func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, cp *TableCheckpoint) error { +func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, cp *TableCheckpoint, + indexEngineCp *EngineCheckpoint, indexEngine *kv.ClosedEngine) error { + if cp.Status < CheckpointStatusIndexImported { + var err error + if indexEngineCp.Status < CheckpointStatusImported { + // the lock ensures the import() step will not be concurrent. + rc.postProcessLock.Lock() + err = t.importKV(ctx, indexEngine) + rc.postProcessLock.Unlock() + rc.saveStatusCheckpoint(t.tableName, indexEngineID, err, CheckpointStatusImported) + } + + // gofail: var FailBeforeIndexEngineImported struct{} + // panic("forcing failure due to FailBeforeIndexEngineImported") + + rc.saveStatusCheckpoint(t.tableName, wholeTableEngineID, err, CheckpointStatusIndexImported) + if err != nil { + common.AppLogger.Errorf("[%[1]s] failed to import index engine: %v", t.tableName, err.Error()) + return errors.Trace(err) + } + } + setSessionConcurrencyVars(ctx, rc.tidbMgr.db, rc.cfg.TiDB) // 3. alter table set auto_increment @@ -712,7 +800,7 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c rc.alterTableLock.Lock() err := t.restoreTableMeta(ctx, rc.tidbMgr.db) rc.alterTableLock.Unlock() - rc.saveStatusCheckpoint(t.tableName, -1, err, CheckpointStatusAlteredAutoInc) + rc.saveStatusCheckpoint(t.tableName, wholeTableEngineID, err, CheckpointStatusAlteredAutoInc) if err != nil { common.AppLogger.Errorf( "[%[1]s] failed to AUTO TABLE %[1]s SET AUTO_INCREMENT=%[2]d : %[3]v", @@ -726,10 +814,10 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c if cp.Status < CheckpointStatusChecksummed { if !rc.cfg.PostRestore.Checksum { common.AppLogger.Infof("[%s] Skip checksum.", t.tableName) - rc.saveStatusCheckpoint(t.tableName, -1, nil, CheckpointStatusChecksumSkipped) + rc.saveStatusCheckpoint(t.tableName, wholeTableEngineID, nil, CheckpointStatusChecksumSkipped) } else { err := t.compareChecksum(ctx, rc.tidbMgr.db, cp) - rc.saveStatusCheckpoint(t.tableName, -1, err, CheckpointStatusChecksummed) + rc.saveStatusCheckpoint(t.tableName, wholeTableEngineID, err, CheckpointStatusChecksummed) if err != nil { common.AppLogger.Errorf("[%s] checksum failed: %v", t.tableName, err.Error()) return errors.Trace(err) @@ -741,10 +829,10 @@ func (t *TableRestore) postProcess(ctx context.Context, rc *RestoreController, c if cp.Status < CheckpointStatusAnalyzed { if !rc.cfg.PostRestore.Analyze { common.AppLogger.Infof("[%s] Skip analyze.", t.tableName) - rc.saveStatusCheckpoint(t.tableName, -1, nil, CheckpointStatusAnalyzeSkipped) + rc.saveStatusCheckpoint(t.tableName, wholeTableEngineID, nil, CheckpointStatusAnalyzeSkipped) } else { err := t.analyzeTable(ctx, rc.tidbMgr.db) - rc.saveStatusCheckpoint(t.tableName, -1, err, CheckpointStatusAnalyzed) + rc.saveStatusCheckpoint(t.tableName, wholeTableEngineID, err, CheckpointStatusAnalyzed) if err != nil { common.AppLogger.Errorf("[%s] analyze failed: %v", t.tableName, err.Error()) return errors.Trace(err) @@ -1001,10 +1089,14 @@ func (t *TableRestore) populateChunks(cfg *config.Config, cp *TableCheckpoint) e } for _, chunk := range chunks { - for chunk.EngineID >= len(cp.Engines) { - cp.Engines = append(cp.Engines, &EngineCheckpoint{Status: CheckpointStatusLoaded}) + engine, found := cp.Engines[chunk.EngineID] + if !found { + engine = &EngineCheckpoint{ + Status: CheckpointStatusLoaded, + } + cp.Engines[chunk.EngineID] = engine } - cp.Engines[chunk.EngineID].Chunks = append(cp.Engines[chunk.EngineID].Chunks, &ChunkCheckpoint{ + engine.Chunks = append(engine.Chunks, &ChunkCheckpoint{ Key: ChunkCheckpointKey{ Path: chunk.File, Offset: chunk.Chunk.Offset, @@ -1014,6 +1106,9 @@ func (t *TableRestore) populateChunks(cfg *config.Config, cp *TableCheckpoint) e }) } + // Add index engine checkpoint + cp.Engines[indexEngineID] = &EngineCheckpoint{Status: CheckpointStatusLoaded} + common.AppLogger.Infof("[%s] load %d engines and %d chunks takes %v", t.tableName, len(cp.Engines), len(chunks), time.Since(timer)) return nil } @@ -1057,11 +1152,9 @@ func (tr *TableRestore) restoreTableMeta(ctx context.Context, db *sql.DB) error func (tr *TableRestore) importKV(ctx context.Context, closedEngine *kv.ClosedEngine) error { common.AppLogger.Infof("[%s] flush kv deliver ...", tr.tableName) - start := time.Now() - err := closedEngine.Import(ctx) - if err != nil { + if err := closedEngine.Import(ctx); err != nil { if !common.IsContextCanceledError(err) { common.AppLogger.Errorf("[%s] failed to flush kvs : %s", tr.tableName, err.Error()) } @@ -1240,8 +1333,8 @@ func splitIntoDeliveryStreams(totalKVs []kvenc.KvPair, splitSize int) [][]kvenc. func (cr *chunkRestore) restore( ctx context.Context, t *TableRestore, - engineID int, - engine *kv.OpenedEngine, + engineID int32, + dataEngine, indexEngine *kv.OpenedEngine, rc *RestoreController, ) error { // Create the encoder. @@ -1280,6 +1373,7 @@ func (cr *chunkRestore) restore( deliverCompleteCh := make(chan error, 1) go func() { + var dataKVs, indexKVs []kvenc.KvPair for { block.cond.L.Lock() for !block.encodeCompleted && len(block.totalKVs) == 0 { @@ -1297,14 +1391,28 @@ func (cr *chunkRestore) restore( // kv -> deliver ( -> tikv ) start := time.Now() - stream, err := engine.NewWriteStream(ctx) + dataStream, err := dataEngine.NewWriteStream(ctx) + if err != nil { + deliverCompleteCh <- errors.Trace(err) + return + } + indexStream, err := indexEngine.NewWriteStream(ctx) if err != nil { deliverCompleteCh <- errors.Trace(err) return } + // class kvs + for _, k := range b.totalKVs { + if k.Key[tablecodec.TableSplitKeyLen+1] == 'r' { + dataKVs = append(dataKVs, k) + } else { + indexKVs = append(indexKVs, k) + } + } + b.totalKVs = nil - for _, kvs := range splitIntoDeliveryStreams(b.totalKVs, maxDeliverBytes) { - if e := stream.Put(kvs); e != nil { + for _, kvs := range splitIntoDeliveryStreams(dataKVs, maxDeliverBytes) { + if e := dataStream.Put(kvs); e != nil { if err != nil { common.AppLogger.Warnf("failed to put write stream: %s", e.Error()) } else { @@ -1312,10 +1420,27 @@ func (cr *chunkRestore) restore( } } } - b.totalKVs = nil + for _, kvs := range splitIntoDeliveryStreams(indexKVs, maxDeliverBytes) { + if e := indexStream.Put(kvs); e != nil { + if err != nil { + common.AppLogger.Warnf("failed to put write stream: %s", e.Error()) + } else { + err = e + } + } + } + dataKVs = dataKVs[:0] + indexKVs = indexKVs[:0] block.cond.Signal() - if e := stream.Close(); e != nil { + if e := dataStream.Close(); e != nil { + if err != nil { + common.AppLogger.Warnf("[%s:%d] failed to close write stream: %s", t.tableName, engineID, e.Error()) + } else { + err = e + } + } + if e := indexStream.Close(); e != nil { if err != nil { common.AppLogger.Warnf("[%s:%d] failed to close write stream: %s", t.tableName, engineID, e.Error()) } else { diff --git a/tests/checkpoint/run.sh b/tests/checkpoint/run.sh index cc24a0146..d018ba83a 100755 --- a/tests/checkpoint/run.sh +++ b/tests/checkpoint/run.sh @@ -57,12 +57,35 @@ PARTIAL_IMPORT_QUERY="$PARTIAL_IMPORT_QUERY AS s;" # Set the failpoint to kill the lightning instance as soon as one table is imported # If checkpoint does work, this should only kill 9 instances of lightnings. -export GOFAIL_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailIfStatusBecomes=return(120)' +export GOFAIL_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailBeforeIndexEngineImported=return' # Start importing the tables. run_sql 'DROP DATABASE IF EXISTS cppk_tsr' run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_test_cppk' +# panic after saving index engine checkpoint status before saving table checkpoint status +set +e +for i in $(seq "$TABLE_COUNT"); do + echo "******** Importing Table Now (step $i/$TABLE_COUNT) ********" + run_lightning 2> /dev/null + [ $? -ne 0 ] || exit 1 +done +set -e + +export GOFAIL_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500)' +set +e +for i in $(seq "$TABLE_COUNT"); do + echo "******** Importing Table Now (step $i/$TABLE_COUNT) ********" + run_lightning 2> /dev/null +done +set -e + +# Start importing the tables. +run_sql 'DROP DATABASE IF EXISTS cppk_tsr' +run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_test_cppk' + +export GOFAIL_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailIfIndexEngineImported=return(1)' + set +e for i in $(seq "$TABLE_COUNT"); do echo "******** Importing Table Now (step $i/$TABLE_COUNT) ********" diff --git a/tests/checkpoint_engines/run.sh b/tests/checkpoint_engines/run.sh index 605f1aab3..672d779bd 100755 --- a/tests/checkpoint_engines/run.sh +++ b/tests/checkpoint_engines/run.sh @@ -18,14 +18,18 @@ set -eu # First, verify that a normal operation is fine. rm -f "$TEST_DIR/lightning-checkpoint-engines.log" +rm -f "/tmp/tidb_lightning_checkpoint.pb" run_sql 'DROP DATABASE IF EXISTS cpeng;' run_lightning -# Check that we have indeed opened 4 engines +# Check that we have indeed opened 6 engines (index + data engine) +DATA_ENGINE_COUNT=4 +INDEX_ENGINE_COUNT=2 +ENGINE_COUNT=6 OPEN_ENGINES_COUNT=$(grep 'open engine' "$TEST_DIR/lightning-checkpoint-engines.log" | wc -l) echo "Number of open engines: $OPEN_ENGINES_COUNT" -[ "$OPEN_ENGINES_COUNT" -eq 4 ] +[ "$OPEN_ENGINES_COUNT" -eq $ENGINE_COUNT ] # Check that everything is correctly imported run_sql 'SELECT count(*), sum(c) FROM cpeng.a' @@ -39,11 +43,13 @@ check_contains 'sum(c): 46' # Now, verify it works with checkpoints as well. run_sql 'DROP DATABASE cpeng;' +rm -f "/tmp/tidb_lightning_checkpoint.pb" -export GOFAIL_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailIfStatusBecomes=return(120)' +# Data engine part +export GOFAIL_FAILPOINTS='github.com/pingcap/tidb-lightning/lightning/restore/SlowDownImport=sleep(500);github.com/pingcap/tidb-lightning/lightning/restore/FailIfStatusBecomes=return(120);github.com/pingcap/tidb-lightning/lightning/restore/FailIfIndexEngineImported=return(140)' set +e -for i in $(seq "$OPEN_ENGINES_COUNT"); do - echo "******** Importing Table Now (step $i/4) ********" +for i in $(seq "$ENGINE_COUNT"); do + echo "******** Importing Table Now (step $i/$ENGINE_COUNT) ********" run_lightning 2> /dev/null [ $? -ne 0 ] || exit 1 done @@ -65,8 +71,8 @@ check_contains 'sum(c): 46' run_sql 'DROP DATABASE cpeng;' set +e -for i in $(seq "$OPEN_ENGINES_COUNT"); do - echo "******** Importing Table Now (step $i/4) ********" +for i in $(seq "$ENGINE_COUNT"); do + echo "******** Importing Table Now (step $i/$ENGINE_COUNT) ********" run_lightning mysql 2> /dev/null [ $? -ne 0 ] || exit 1 done diff --git a/tidb-lightning.toml b/tidb-lightning.toml index d8d965378..8cd9cadd9 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -7,6 +7,8 @@ pprof-port = 8289 # check if the cluster satisfies the minimum requirement before starting # check-requirements = true +# index-concurrency controls the maximum handled index concurrently while reading Mydumper SQL files. It can affect the tikv-importer disk usage. +index-concurrency = 2 # table-concurrency controls the maximum handled tables concurrently while reading Mydumper SQL files. It can affect the tikv-importer memory usage. table-concurrency = 8 # region-concurrency changes the concurrency number of data. It is set to the number of logical CPU cores by default and needs no configuration.