From 05ad9cf3c2d7e25ec4438bb74b85e8fd9ed7420b Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 29 Mar 2018 17:57:02 +0800 Subject: [PATCH] skip table on error and fix golint (#21) --- ingest/common/metrics.go | 2 +- ingest/kv/kv-deliver.go | 6 +-- ingest/kv/sql2kv.go | 4 +- ingest/log/log.go | 4 ++ ingest/mydump/loader.go | 6 +-- ingest/mydump/reader.go | 12 +++--- ingest/mydump/region.go | 5 +-- ingest/restore/restore.go | 66 +++++++++++++++++++-------------- ingest/sql/parser.go | 6 +-- ingest/verification/checksum.go | 2 +- 10 files changed, 64 insertions(+), 49 deletions(-) diff --git a/ingest/common/metrics.go b/ingest/common/metrics.go index 6f37bec600ac8..dcf8f0232f5e4 100644 --- a/ingest/common/metrics.go +++ b/ingest/common/metrics.go @@ -38,7 +38,7 @@ func (m *Metrics) costTimeNS(name string, ns int64) { func (m *Metrics) DumpTiming() string { marks := make([]string, 0, len(m.Timing)) - for mark, _ := range m.Timing { + for mark := range m.Timing { marks = append(marks, mark) } sort.Strings(marks) diff --git a/ingest/kv/kv-deliver.go b/ingest/kv/kv-deliver.go index 699362b230524..d27141e8bc82f 100644 --- a/ingest/kv/kv-deliver.go +++ b/ingest/kv/kv-deliver.go @@ -285,7 +285,7 @@ type KVDeliverKeeper struct { pdAddr string clientsPool []*KVDeliverClient // aka. connection pool - txnIdCounter int // TODO : need to update to another algorithm + txnIDCounter int // TODO : need to update to another algorithm txnBoard map[uuid.UUID]*txnInfo txns map[string][]*deliverTxn // map[tag]{*txn, *txn, *txn ...} @@ -311,7 +311,7 @@ func NewKVDeliverKeeper(importServerAddr, pdAddr string) *KVDeliverKeeper { pdAddr: pdAddr, clientsPool: make([]*KVDeliverClient, 0, 32), - txnIdCounter: 0, // TODO : need to update to another algorithm + txnIDCounter: 0, // TODO : need to update to another algorithm txns: make(map[string][]*deliverTxn), txnBoard: make(map[uuid.UUID]*txnInfo), txnFlushQueue: make(chan *deliverTxn, 64), @@ -349,7 +349,7 @@ func (k *KVDeliverKeeper) validate(txn *deliverTxn) bool { } func (k *KVDeliverKeeper) newTxn(db string, table string) *deliverTxn { - k.txnIdCounter++ + k.txnIDCounter++ uuid := uuid.Must(uuid.NewV4()) tag := buildTag(db, table) diff --git a/ingest/kv/sql2kv.go b/ingest/kv/sql2kv.go index 8ab7d27ca93b0..ddb3e29e64df6 100644 --- a/ingest/kv/sql2kv.go +++ b/ingest/kv/sql2kv.go @@ -24,8 +24,8 @@ func setGlobalVars() { plan.PreparedPlanCacheCapacity = 10 } -func InitMembufCap(batchSqlLength int64) { - kv.ImportingTxnMembufCap = int(batchSqlLength) * 4 +func InitMembufCap(batchSQLLength int64) { + kv.ImportingTxnMembufCap = int(batchSQLLength) * 4 // TODO : calculate predicted ratio, bwtween sql and kvs' size, base on specified DDL } diff --git a/ingest/log/log.go b/ingest/log/log.go index 39d2c5486168a..d3d40caddb1d2 100644 --- a/ingest/log/log.go +++ b/ingest/log/log.go @@ -12,6 +12,7 @@ import ( "gopkg.in/natefinch/lumberjack.v2" "github.com/pingcap/tidb-lightning/ingest/common" + "github.com/pingcap/tidb/util/logutil" ) const ( @@ -130,6 +131,9 @@ func InitLogger(cfg *LogConfig) error { log.AddHook(&contextHook{}) log.SetFormatter(&SimpleTextFormater{}) + // increase tidb log level to hide the annoying log. + logutil.InitLogger(&logutil.LogConfig{Level: "warn"}) + if len(cfg.File) > 0 { if common.IsDirExists(cfg.File) { return errors.Errorf("can't use directory as log file name : %s", cfg.File) diff --git a/ingest/mydump/loader.go b/ingest/mydump/loader.go index fe3caf6ee5206..fd65f1a1e3084 100644 --- a/ingest/mydump/loader.go +++ b/ingest/mydump/loader.go @@ -245,15 +245,15 @@ func countValues(sqlText []byte) int { ps : Count num of tuples (/values) appears within sql statement like : "INSERT INTO `table` VALUES (..), (..), (..);" */ - var textLen int = len(sqlText) + var textLen = len(sqlText) var slice []byte - var tuplesNum int = 0 + var tuplesNum int for i, chr := range sqlText { if chr == ')' && i < textLen-1 { slice = bytes.TrimSpace(sqlText[i+1:]) if len(slice) > 0 && (slice[0] == ',' || slice[0] == ';') { - tuplesNum += 1 + tuplesNum++ } } } diff --git a/ingest/mydump/reader.go b/ingest/mydump/reader.go index 2df74daa17aa9..8a889fc8577b7 100644 --- a/ingest/mydump/reader.go +++ b/ingest/mydump/reader.go @@ -95,7 +95,7 @@ func NewMDDataReader(file string, offset int64) (*MDDataReader, error) { } if len(mdr.stmtHeader) == 0 { - return nil, errors.New("can not find any insert statment !") + return nil, errors.New("can not find any insert statment") } mdr.skipAnnotation(offset) @@ -144,12 +144,12 @@ func (r *MDDataReader) Tell() int64 { } func (r *MDDataReader) currOffset() int64 { - if off, err := r.fd.Seek(0, io.SeekCurrent); err != nil { + off, err := r.fd.Seek(0, io.SeekCurrent) + if err != nil { log.Errorf("get file offset failed (%s) : %v", r.file, err) return -1 - } else { - return off } + return off } func getInsertStatmentHeader(file string) []byte { @@ -196,7 +196,7 @@ func (r *MDDataReader) Read(minSize int64) ([][]byte, error) { defer reader.Reset(fd) // split file's content into multi sql statement - var stmts [][]byte = make([][]byte, 0, 8) + var stmts = make([][]byte, 0, 8) appendSQL := func(sql []byte) { sql = bytes.TrimSpace(sql) sqlLen := len(sql) @@ -235,7 +235,7 @@ func (r *MDDataReader) Read(minSize int64) ([][]byte, error) { (...); ''' */ - var statment []byte = make([]byte, 0, minSize+4096) + var statment = make([]byte, 0, minSize+4096) var readSize, lineSize int64 var line []byte var err error diff --git a/ingest/mydump/region.go b/ingest/mydump/region.go index e8596d7af6393..7f5f8e11b4d90 100644 --- a/ingest/mydump/region.go +++ b/ingest/mydump/region.go @@ -39,9 +39,8 @@ func (rs regionSlice) Swap(i, j int) { func (rs regionSlice) Less(i, j int) bool { if rs[i].File == rs[j].File { return rs[i].Offset < rs[j].Offset - } else { - return rs[i].File < rs[j].File } + return rs[i].File < rs[j].File } //////////////////////////////////////////////////////////////// @@ -108,7 +107,7 @@ func (f *RegionFounder) MakeTableRegions(meta *MDTableMeta, allocateRowID bool) region.BeginRowID = -1 } - var tableRows int64 = 0 + var tableRows int64 for _, region := range filesRegions { if allocateRowID { region.BeginRowID = tableRows + 1 diff --git a/ingest/restore/restore.go b/ingest/restore/restore.go index 9685a0cfd055c..3c9ce57475a50 100644 --- a/ingest/restore/restore.go +++ b/ingest/restore/restore.go @@ -173,6 +173,8 @@ func (rc *RestoreControlloer) restoreTables(ctx context.Context) error { } }() + skipTables := make(map[string]struct{}) + var wg sync.WaitGroup for _, task := range tasks { select { @@ -183,11 +185,21 @@ func (rc *RestoreControlloer) restoreTables(ctx context.Context) error { worker := workers.Apply() wg.Add(1) - log.Warnf("region allowed to run >>>>>> [%s]", task.region.Name()) + log.Infof("restoring region %s", task.region.Name()) go func(w *RestoreWorker, t *regionRestoreTask) { - defer wg.Done() defer workers.Recycle(w) - t.Run(ctx) + defer wg.Done() + table := fmt.Sprintf("%s.%s", t.region.DB, t.region.Table) + if _, ok := skipTables[table]; ok { + log.Infof("something wrong with table %s before, so skip region %s", table, t.region.Name()) + return + } + err := t.Run(ctx) + if err != nil { + log.Errorf("table %s region %s run task error %s", table, t.region.Name(), errors.ErrorStack(err)) + skipTables[table] = struct{}{} + } + }(worker, task) } wg.Wait() // TODO ... ctx abroted @@ -354,7 +366,7 @@ const ( statFailed string = "failed" ) -type restoreCallback func(regionID int, maxRowID int64, rows uint64, checksum *verify.KVChecksum, err error) +type restoreCallback func(regionID int, maxRowID int64, rows uint64, checksum *verify.KVChecksum) error type regionRestoreTask struct { status string @@ -383,21 +395,24 @@ func newRegionRestoreTask( } } -func (t *regionRestoreTask) Run(ctx context.Context) { +func (t *regionRestoreTask) Run(ctx context.Context) error { region := t.region log.Infof("Start restore region : [%s] ...", t.region.Name()) t.status = statRunning maxRowID, rows, checksum, err := t.run(ctx) if err != nil { - log.Errorf("Table region (%s) restore failed : %s", region.Name(), err.Error()) + return errors.Trace(err) } log.Infof("Finished restore region : [%s]", region.Name()) - t.callback(region.ID, maxRowID, rows, checksum, err) + err = t.callback(region.ID, maxRowID, rows, checksum) + if err != nil { + return errors.Trace(err) + } t.status = statFinished - return + return nil } func (t *regionRestoreTask) run(ctx context.Context) (int64, uint64, *verify.KVChecksum, error) { @@ -571,12 +586,9 @@ func (tr *TableRestore) loadRegions() { regions := founder.MakeTableRegions(tr.tableMeta, preAllocateRowsID) table := tr.tableMeta.Name - for _, region := range regions { - log.Warnf("[%s] region - %s", table, region.Name()) - } - id2regions := make(map[int]*mydump.TableRegion) for _, region := range regions { + log.Infof("[%s] region - %s", table, region.Name()) id2regions[region.ID] = region } @@ -593,18 +605,11 @@ func (tr *TableRestore) loadRegions() { return } -func (tr *TableRestore) onRegionFinished(id int, maxRowID int64, rows uint64, checksum *verify.KVChecksum, err error) { +func (tr *TableRestore) onRegionFinished(id int, maxRowID int64, rows uint64, checksum *verify.KVChecksum) error { table := tr.tableInfo.Name tr.mux.Lock() defer tr.mux.Unlock() - region := tr.id2regions[id] - if err != nil { - log.Errorf("[%s] region (%s) restore failed : %s", - table, region.Name(), err.Error()) - return - } - tr.handledRegions[id] = ®ionStat{ maxRowID: maxRowID, rows: rows, @@ -613,19 +618,22 @@ func (tr *TableRestore) onRegionFinished(id int, maxRowID int64, rows uint64, ch total := len(tr.regions) handled := len(tr.handledRegions) - log.Infof("[%s] handled region count = %d (%s)", table, handled, common.Percent(handled, total)) + log.Infof("table %s handled region count = %d (%s)", table, handled, common.Percent(handled, total)) if handled == len(tr.tasks) { - tr.onFinished() + err := tr.onFinished() + if err != nil { + return errors.Trace(err) + } } - return + return nil } func (tr *TableRestore) makeKVDeliver() (kv.KVDeliver, error) { return makeKVDeliver(tr.ctx, tr.cfg, tr.dbInfo, tr.tableInfo) } -func (tr *TableRestore) onFinished() { +func (tr *TableRestore) onFinished() error { // generate meta kv var ( tableMaxRowID int64 @@ -643,13 +651,17 @@ func (tr *TableRestore) onFinished() { log.Infof("table %s self-calculated checksum %s", table, checksum) tr.localChecksums[table] = checksum - tr.restoreTableMeta(tableMaxRowID) + if err := tr.restoreTableMeta(tableMaxRowID); err != nil { + return errors.Trace(err) + } // flush all kvs into TiKV ~ - tr.ingestKV() + if err := tr.ingestKV(); err != nil { + return errors.Trace(err) + } log.Infof("table %s has imported %d rows", table, tableRows) - return + return nil } func (tr *TableRestore) restoreTableMeta(rowID int64) error { diff --git a/ingest/sql/parser.go b/ingest/sql/parser.go index 56691aaf22a63..8e50188224e0e 100644 --- a/ingest/sql/parser.go +++ b/ingest/sql/parser.go @@ -42,9 +42,9 @@ func ParseInsertStmt(sql []byte, values *[]interface{}) error { stack := 0 for e = s; e < size; e++ { if sql[e] == '(' { - stack += 1 + stack++ } else if sql[e] == ')' { - stack -= 1 + stack-- if stack == 0 { break } @@ -57,7 +57,7 @@ func ParseInsertStmt(sql []byte, values *[]interface{}) error { // extract columns' values _ = parseRowValues(sql[s+1:e], values) - e += 1 // skip ')' + e++ // skip ')' // check ending ")," or ");" for ; e < size; e++ { diff --git a/ingest/verification/checksum.go b/ingest/verification/checksum.go index 1a2ea54f88d3b..784d3d1ee73dc 100644 --- a/ingest/verification/checksum.go +++ b/ingest/verification/checksum.go @@ -33,7 +33,7 @@ func (c *KVChecksum) Update(kvs []kvec.KvPair) { sum = crc64.Update(0, c.ecmaTable, pair.Key) sum = crc64.Update(sum, c.ecmaTable, pair.Val) checksum ^= sum - kvNum += 1 + kvNum++ bytes += (len(pair.Key) + len(pair.Val)) }