diff --git a/cdc/changefeed.go b/cdc/changefeed.go index ccc31576f7e..0d063e6de1b 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -22,8 +22,8 @@ import ( "github.com/pingcap/log" timodel "github.com/pingcap/parser/model" "github.com/pingcap/ticdc/cdc/entry" + "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/cdc/roles/storage" "github.com/pingcap/ticdc/cdc/sink" "github.com/pingcap/ticdc/pkg/cyclic" "github.com/pingcap/ticdc/pkg/filter" @@ -31,7 +31,7 @@ import ( "go.uber.org/zap" ) -type tableIDMap = map[uint64]struct{} +type tableIDMap = map[model.TableID]struct{} // OwnerDDLHandler defines the ddl handler for Owner // which can pull ddl jobs and execute ddl jobs @@ -72,7 +72,7 @@ type changeFeed struct { ddlState model.ChangeFeedDDLState targetTs uint64 taskStatus model.ProcessorsInfos - taskPositions map[string]*model.TaskPosition + taskPositions map[model.CaptureID]*model.TaskPosition filter *filter.Filter sink sink.Sink @@ -83,16 +83,13 @@ type changeFeed struct { ddlJobHistory []*timodel.Job ddlExecutedTs uint64 - schemas map[uint64]tableIDMap - tables map[uint64]entry.TableName - partitions map[uint64][]int64 // key is table ID, value is the slice of partitions ID. - // The key is table ID or the partition ID. - orphanTables map[uint64]model.ProcessTableInfo - // The key is table ID or the partition ID. - waitingConfirmTables map[uint64]string - // The key is table ID or the partition ID. - toCleanTables map[uint64]struct{} - infoWriter *storage.OwnerTaskStatusEtcdWriter + schemas map[model.SchemaID]tableIDMap + tables map[model.TableID]entry.TableName + // value of partitions is the slice of partitions ID. + partitions map[model.TableID][]int64 + orphanTables map[model.TableID]model.Ts + toCleanTables map[model.TableID]model.Ts + etcdCli kv.CDCEtcdClient } // String implements fmt.Stringer interface. @@ -115,30 +112,30 @@ func (c *changeFeed) updateProcessorInfos(processInfos model.ProcessorsInfos, po c.taskPositions = positions } -func (c *changeFeed) addSchema(schemaID uint64) { +func (c *changeFeed) addSchema(schemaID model.SchemaID) { if _, ok := c.schemas[schemaID]; ok { - log.Warn("add schema already exists", zap.Uint64("schemaID", schemaID)) + log.Warn("add schema already exists", zap.Int64("schemaID", schemaID)) return } - c.schemas[schemaID] = make(map[uint64]struct{}) + c.schemas[schemaID] = make(map[model.TableID]struct{}) } -func (c *changeFeed) dropSchema(schemaID uint64) { +func (c *changeFeed) dropSchema(schemaID model.SchemaID, targetTs model.Ts) { if schema, ok := c.schemas[schemaID]; ok { for tid := range schema { - c.removeTable(schemaID, tid) + c.removeTable(schemaID, tid, targetTs) } } delete(c.schemas, schemaID) } -func (c *changeFeed) addTable(sid, tid, startTs uint64, table entry.TableName, tblInfo *timodel.TableInfo) { +func (c *changeFeed) addTable(sid model.SchemaID, tid model.TableID, startTs model.Ts, table entry.TableName, tblInfo *timodel.TableInfo) { if c.filter.ShouldIgnoreTable(table.Schema, table.Table) { return } if _, ok := c.tables[tid]; ok { - log.Warn("add table already exists", zap.Uint64("tableID", tid), zap.Stringer("table", table)) + log.Warn("add table already exists", zap.Int64("tableID", tid), zap.Stringer("table", table)) return } @@ -151,37 +148,30 @@ func (c *changeFeed) addTable(sid, tid, startTs uint64, table entry.TableName, t delete(c.partitions, tid) for _, partition := range pi.Definitions { c.partitions[tid] = append(c.partitions[tid], partition.ID) - id := uint64(partition.ID) - c.orphanTables[id] = model.ProcessTableInfo{ - ID: id, - StartTs: startTs, - } + c.orphanTables[partition.ID] = startTs } } else { - c.orphanTables[tid] = model.ProcessTableInfo{ - ID: tid, - StartTs: startTs, - } + c.orphanTables[tid] = startTs } } -func (c *changeFeed) removeTable(sid, tid uint64) { +func (c *changeFeed) removeTable(sid model.SchemaID, tid model.TableID, targetTs model.Ts) { if _, ok := c.schemas[sid]; ok { delete(c.schemas[sid], tid) } delete(c.tables, tid) - removeFunc := func(id uint64) { + removeFunc := func(id int64) { if _, ok := c.orphanTables[id]; ok { delete(c.orphanTables, id) } else { - c.toCleanTables[id] = struct{}{} + c.toCleanTables[id] = targetTs } } if pids, ok := c.partitions[tid]; ok { for _, id := range pids { - removeFunc(uint64(id)) + removeFunc(id) } delete(c.partitions, tid) } else { @@ -189,11 +179,11 @@ func (c *changeFeed) removeTable(sid, tid uint64) { } } -func (c *changeFeed) selectCapture(captures map[string]*model.CaptureInfo, toAppend map[string][]*model.ProcessTableInfo) string { - return c.minimumTablesCapture(captures, toAppend) +func (c *changeFeed) selectCapture(captures map[string]*model.CaptureInfo, newTaskStatus map[model.CaptureID]*model.TaskStatus) string { + return c.minimumTablesCapture(captures, newTaskStatus) } -func (c *changeFeed) minimumTablesCapture(captures map[string]*model.CaptureInfo, toAppend map[string][]*model.ProcessTableInfo) string { +func (c *changeFeed) minimumTablesCapture(captures map[string]*model.CaptureInfo, newTaskStatus map[model.CaptureID]*model.TaskStatus) string { if len(captures) == 0 { return "" } @@ -202,11 +192,11 @@ func (c *changeFeed) minimumTablesCapture(captures map[string]*model.CaptureInfo minCount := math.MaxInt64 for id := range captures { var tableCount int - if pinfo, ok := c.taskStatus[id]; ok { - tableCount += len(pinfo.TableInfos) + if status, ok := c.taskStatus[id]; ok { + tableCount = len(status.Tables) } - if append, ok := toAppend[id]; ok { - tableCount += len(append) + if newStatus, ok := newTaskStatus[id]; ok { + tableCount = len(newStatus.Tables) } if tableCount < minCount { minID = id @@ -218,66 +208,14 @@ func (c *changeFeed) minimumTablesCapture(captures map[string]*model.CaptureInfo } func (c *changeFeed) tryBalance(ctx context.Context, captures map[string]*model.CaptureInfo) error { - c.cleanTables(ctx) return c.balanceOrphanTables(ctx, captures) } -func (c *changeFeed) restoreTableInfos(infoSnapshot *model.TaskStatus, captureID string) { - // the capture information maybe deleted during table cleaning - if _, ok := c.taskStatus[captureID]; !ok { - log.Warn("ignore restore table info, task status for capture not found", zap.String("captureID", captureID)) - } - c.taskStatus[captureID].TableInfos = infoSnapshot.TableInfos -} - -func (c *changeFeed) cleanTables(ctx context.Context) { - var cleanIDs []uint64 - -cleanLoop: - for id := range c.toCleanTables { - captureID, taskStatus, ok := findTaskStatusWithTable(c.taskStatus, id) - if !ok { - log.Warn("ignore clean table id", zap.Uint64("id", id)) - cleanIDs = append(cleanIDs, id) - continue - } - - infoClone := taskStatus.Clone() - taskStatus.RemoveTable(id) - - newInfo, err := c.infoWriter.Write(ctx, c.id, captureID, taskStatus, true) - if err == nil { - c.taskStatus[captureID] = newInfo - } - switch errors.Cause(err) { - case model.ErrFindPLockNotCommit: - c.restoreTableInfos(infoClone, captureID) - log.Info("write table info delay, wait plock resolve", - zap.String("changefeed", c.id), - zap.String("capture", captureID)) - case nil: - log.Info("cleanup table success", - zap.Uint64("table id", id), - zap.String("capture id", captureID)) - log.Debug("after remove", zap.Stringer("task status", taskStatus)) - cleanIDs = append(cleanIDs, id) - default: - c.restoreTableInfos(infoClone, captureID) - log.Error("fail to put sub changefeed info", zap.Error(err)) - break cleanLoop - } - } - - for _, id := range cleanIDs { - delete(c.toCleanTables, id) - } -} - -func findTaskStatusWithTable(infos model.ProcessorsInfos, tableID uint64) (captureID string, info *model.TaskStatus, ok bool) { - for id, info := range infos { - for _, table := range info.TableInfos { - if table.ID == tableID { - return id, info, true +func findTaskStatusWithTable(infos model.ProcessorsInfos, tableID model.TableID) (captureID model.CaptureID, info *model.TaskStatus, ok bool) { + for cid, info := range infos { + for tid := range info.Tables { + if tid == tableID { + return cid, info, true } } } @@ -285,34 +223,45 @@ func findTaskStatusWithTable(infos model.ProcessorsInfos, tableID uint64) (captu return "", nil, false } -func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[string]*model.CaptureInfo) error { +func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[model.CaptureID]*model.CaptureInfo) error { if len(captures) == 0 { return nil } + for _, status := range c.taskStatus { + if status.SomeOperationsUnapplied() { + return nil + } + } - for tableID, captureID := range c.waitingConfirmTables { - lockStatus, err := c.infoWriter.CheckLock(ctx, c.id, captureID) - if err != nil { - return errors.Trace(err) + newTaskStatus := make(map[model.CaptureID]*model.TaskStatus, len(captures)) + cleanedTables := make(map[model.TableID]struct{}) + addedTables := make(map[model.TableID]struct{}) + + for id, targetTs := range c.toCleanTables { + captureID, taskStatus, ok := findTaskStatusWithTable(c.taskStatus, id) + if !ok { + log.Warn("ignore clean table id", zap.Int64("id", id)) + delete(c.toCleanTables, id) + continue } - switch lockStatus { - case model.TableNoLock: - log.Debug("no c-lock", zap.Uint64("tableID", tableID), zap.String("captureID", captureID)) - delete(c.waitingConfirmTables, tableID) - case model.TablePLock: - log.Debug("waiting the c-lock", zap.Uint64("tableID", tableID), zap.String("captureID", captureID)) - case model.TablePLockCommited: - log.Debug("delete the c-lock", zap.Uint64("tableID", tableID), zap.String("captureID", captureID)) - delete(c.waitingConfirmTables, tableID) + status, exist := newTaskStatus[captureID] + if !exist { + status = taskStatus.Clone() } + status.RemoveTable(id) + status.Operation = append(status.Operation, &model.TableOperation{ + TableID: id, + Delete: true, + BoundaryTs: targetTs, + }) + newTaskStatus[captureID] = status + cleanedTables[id] = struct{}{} } - - appendTableInfos := make(map[string][]*model.ProcessTableInfo, len(captures)) schemaSnapshot := c.schema - for tableID, orphan := range c.orphanTables { - var orphanMarkTable *model.ProcessTableInfo + for tableID, startTs := range c.orphanTables { + var orphanMarkTableID int64 if c.cyclicEnabled { - tableName, found := schemaSnapshot.GetTableNameByID(int64(tableID)) + tableName, found := schemaSnapshot.GetTableNameByID(tableID) if !found || cyclic.IsMarkTable(tableName.Schema, tableName.Table) { // Skip, mark tables should not be balanced alone. continue @@ -324,73 +273,73 @@ func (c *changeFeed) balanceOrphanTables(ctx context.Context, captures map[strin // Mark table is not created yet, skip and wait. log.Info("balance table info delay, wait mark table", zap.String("changefeed", c.id), - zap.Uint64("tableID", tableID), + zap.Int64("tableID", tableID), zap.String("markTableName", markTableTableName)) continue } - orphanMarkTable = &model.ProcessTableInfo{ - ID: uint64(id), - StartTs: orphan.StartTs, - } + orphanMarkTableID = id } - - captureID := c.selectCapture(captures, appendTableInfos) + captureID := c.selectCapture(captures, newTaskStatus) if len(captureID) == 0 { return nil } - info := appendTableInfos[captureID] - info = append(info, &model.ProcessTableInfo{ - ID: tableID, - StartTs: orphan.StartTs, + taskStatus := c.taskStatus[captureID] + if taskStatus == nil { + taskStatus = new(model.TaskStatus) + } + status, exist := newTaskStatus[captureID] + if !exist { + status = taskStatus.Clone() + } + status.Tables[tableID] = startTs + status.Operation = append(status.Operation, &model.TableOperation{ + TableID: tableID, + BoundaryTs: startTs, }) - // Table and mark table must be balanced to the same capture. - if orphanMarkTable != nil { - info = append(info, orphanMarkTable) + if orphanMarkTableID != 0 { + status.Tables[orphanMarkTableID] = startTs + addedTables[orphanMarkTableID] = struct{}{} + status.Operation = append(status.Operation, &model.TableOperation{ + TableID: orphanMarkTableID, + BoundaryTs: startTs, + }) } - appendTableInfos[captureID] = info + newTaskStatus[captureID] = status + addedTables[tableID] = struct{}{} } - for captureID, tableInfos := range appendTableInfos { - status := c.taskStatus[captureID] - if status == nil { - status = new(model.TaskStatus) - } - statusClone := status.Clone() - status.TableInfos = append(status.TableInfos, tableInfos...) - - newInfo, err := c.infoWriter.Write(ctx, c.id, captureID, status, true) - if err == nil { - c.taskStatus[captureID] = newInfo - } - switch errors.Cause(err) { - case model.ErrFindPLockNotCommit: - c.restoreTableInfos(statusClone, captureID) - log.Info("write table info delay, wait plock resolve", - zap.String("changefeed", c.id), - zap.String("capture", captureID)) - case nil: - log.Info("dispatch table success", - zap.Reflect("tableInfos", tableInfos), - zap.String("capture", captureID)) - for _, tableInfo := range tableInfos { - delete(c.orphanTables, tableInfo.ID) - c.waitingConfirmTables[tableInfo.ID] = captureID + for captureID, status := range newTaskStatus { + newStatus, err := c.etcdCli.AtomicPutTaskStatus(ctx, c.id, captureID, func(taskStatus *model.TaskStatus) error { + if taskStatus.SomeOperationsUnapplied() { + return errors.Errorf("waiting to processor handle the operation finished time out") } - default: - c.restoreTableInfos(statusClone, captureID) - log.Error("fail to put sub changefeed info", zap.Error(err)) + taskStatus.Tables = status.Tables + taskStatus.Operation = status.Operation + return nil + }) + if err != nil { return errors.Trace(err) } + c.taskStatus[captureID] = newStatus.Clone() + log.Info("dispatch table success", zap.String("captureID", captureID), zap.Stringer("status", status)) + } + + for tableID := range cleanedTables { + delete(c.toCleanTables, tableID) } + for tableID := range addedTables { + delete(c.orphanTables, tableID) + } + return nil } func (c *changeFeed) applyJob(ctx context.Context, job *timodel.Job) (skip bool, err error) { - schemaID := uint64(job.SchemaID) + schemaID := job.SchemaID if job.BinlogInfo != nil && job.BinlogInfo.TableInfo != nil && c.schema.IsIneligibleTableID(job.BinlogInfo.TableInfo.ID) { - tableID := uint64(job.BinlogInfo.TableInfo.ID) + tableID := job.BinlogInfo.TableInfo.ID if _, exist := c.tables[tableID]; exist { - c.removeTable(schemaID, tableID) + c.removeTable(schemaID, tableID, job.BinlogInfo.FinishedTS) } return true, nil } @@ -401,33 +350,33 @@ func (c *changeFeed) applyJob(ctx context.Context, job *timodel.Job) (skip bool, case timodel.ActionCreateSchema: c.addSchema(schemaID) case timodel.ActionDropSchema: - c.dropSchema(schemaID) + c.dropSchema(schemaID, job.BinlogInfo.FinishedTS) case timodel.ActionCreateTable, timodel.ActionRecoverTable: - addID := uint64(job.BinlogInfo.TableInfo.ID) + addID := job.BinlogInfo.TableInfo.ID tableName, exist := c.schema.GetTableNameByID(job.BinlogInfo.TableInfo.ID) if !exist { return errors.NotFoundf("table(%d)", addID) } c.addTable(schemaID, addID, job.BinlogInfo.FinishedTS, tableName, job.BinlogInfo.TableInfo) case timodel.ActionDropTable: - dropID := uint64(job.TableID) - c.removeTable(schemaID, dropID) + dropID := job.TableID + c.removeTable(schemaID, dropID, job.BinlogInfo.FinishedTS) case timodel.ActionRenameTable: tableName, exist := c.schema.GetTableNameByID(job.TableID) if !exist { return errors.NotFoundf("table(%d)", job.TableID) } // no id change just update name - c.tables[uint64(job.TableID)] = tableName + c.tables[job.TableID] = tableName case timodel.ActionTruncateTable: - dropID := uint64(job.TableID) - c.removeTable(schemaID, dropID) + dropID := job.TableID + c.removeTable(schemaID, dropID, job.BinlogInfo.FinishedTS) tableName, exist := c.schema.GetTableNameByID(job.BinlogInfo.TableInfo.ID) if !exist { return errors.NotFoundf("table(%d)", job.BinlogInfo.TableInfo.ID) } - addID := uint64(job.BinlogInfo.TableInfo.ID) + addID := job.BinlogInfo.TableInfo.ID c.addTable(schemaID, addID, job.BinlogInfo.FinishedTS, tableName, job.BinlogInfo.TableInfo) } return nil @@ -533,13 +482,6 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { return nil } - // ProcessorInfos don't contains the whole set table id now. - if len(c.waitingConfirmTables) > 0 { - log.Debug("skip calcResolvedTs", - zap.Reflect("waitingConfirmTables", c.orphanTables)) - return nil - } - minResolvedTs := c.targetTs minCheckpointTs := c.targetTs @@ -561,12 +503,34 @@ func (c *changeFeed) calcResolvedTs(ctx context.Context) error { } } - for _, orphanTable := range c.orphanTables { - if minCheckpointTs > orphanTable.StartTs { - minCheckpointTs = orphanTable.StartTs + for captureID, status := range c.taskStatus { + appliedTs := status.AppliedTs() + if minCheckpointTs > appliedTs { + minCheckpointTs = appliedTs + } + if minResolvedTs > appliedTs { + minResolvedTs = appliedTs + } + if appliedTs != math.MaxUint64 { + log.Info("some operation is still unapplied", zap.String("captureID", captureID), zap.Uint64("appliedTs", appliedTs), zap.Stringer("status", status)) + } + } + + for _, startTs := range c.orphanTables { + if minCheckpointTs > startTs { + minCheckpointTs = startTs + } + if minResolvedTs > startTs { + minResolvedTs = startTs + } + } + + for _, targetTs := range c.toCleanTables { + if minCheckpointTs > targetTs { + minCheckpointTs = targetTs } - if minResolvedTs > orphanTable.StartTs { - minResolvedTs = orphanTable.StartTs + if minResolvedTs > targetTs { + minResolvedTs = targetTs } } diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index ec47f70ec6e..15d99c43f30 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -664,11 +664,11 @@ func (s *schemaSnapshot) handleDDL(job *timodel.Job) error { } // CloneTables return a clone of the existing tables. -func (s *schemaSnapshot) CloneTables() map[uint64]TableName { - mp := make(map[uint64]TableName, len(s.tables)) +func (s *schemaSnapshot) CloneTables() map[model.TableID]TableName { + mp := make(map[model.TableID]TableName, len(s.tables)) for id, table := range s.tables { - mp[uint64(id)] = table.TableName + mp[id] = table.TableName } return mp diff --git a/cdc/kv/etcd.go b/cdc/kv/etcd.go index a1d5bb5688e..29b90774af0 100644 --- a/cdc/kv/etcd.go +++ b/cdc/kv/etcd.go @@ -16,13 +16,16 @@ package kv import ( "context" "fmt" + "time" - "go.etcd.io/etcd/clientv3/concurrency" - "go.etcd.io/etcd/embed" - + "github.com/cenkalti/backoff" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/retry" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/concurrency" + "go.etcd.io/etcd/embed" "go.etcd.io/etcd/mvcc/mvccpb" ) @@ -332,6 +335,63 @@ func (c CDCEtcdClient) PutTaskStatus( return nil } +// AtomicPutTaskStatus puts task status into etcd atomically. +func (c CDCEtcdClient) AtomicPutTaskStatus( + ctx context.Context, + changefeedID string, + captureID string, + update func(*model.TaskStatus) error, +) (*model.TaskStatus, error) { + var status *model.TaskStatus + err := retry.Run(100*time.Millisecond, 3, func() error { + select { + case <-ctx.Done(): + return backoff.Permanent(ctx.Err()) + default: + } + var modRevision int64 + var err error + modRevision, status, err = c.GetTaskStatus(ctx, changefeedID, captureID) + key := GetEtcdKeyTaskStatus(changefeedID, captureID) + var writeCmp clientv3.Cmp + switch errors.Cause(err) { + case model.ErrTaskStatusNotExists: + status = new(model.TaskStatus) + writeCmp = clientv3.Compare(clientv3.ModRevision(key), "=", 0) + case nil: + writeCmp = clientv3.Compare(clientv3.ModRevision(key), "=", modRevision) + default: + return errors.Trace(err) + } + err = update(status) + if err != nil { + return errors.Trace(err) + } + value, err := status.Marshal() + if err != nil { + return errors.Trace(err) + } + + resp, err := c.Client.KV.Txn(ctx).If(writeCmp).Then( + clientv3.OpPut(key, value), + ).Commit() + + if err != nil { + return errors.Trace(err) + } + + if !resp.Succeeded { + log.Info("outdated table infos, ignore update taskStatus") + return errors.Annotatef(model.ErrWriteTsConflict, "key: %s", key) + } + return nil + }) + if err != nil { + return nil, errors.Trace(err) + } + return status, nil +} + // GetTaskPosition queries task process from etcd, returns // - ModRevision of the given key // - *model.TaskPosition unmarshaled from the value diff --git a/cdc/kv/etcd_test.go b/cdc/kv/etcd_test.go index 14eb1f04223..acd79f6d8a9 100644 --- a/cdc/kv/etcd_test.go +++ b/cdc/kv/etcd_test.go @@ -102,8 +102,8 @@ func (s *etcdSuite) TestGetChangeFeeds(c *check.C) { func (s *etcdSuite) TestGetPutTaskStatus(c *check.C) { ctx := context.Background() info := &model.TaskStatus{ - TableInfos: []*model.ProcessTableInfo{ - {ID: 1, StartTs: 100}, + Tables: map[model.TableID]model.Ts{ + 1: 100, }, } @@ -126,8 +126,8 @@ func (s *etcdSuite) TestGetPutTaskStatus(c *check.C) { func (s *etcdSuite) TestDeleteTaskStatus(c *check.C) { ctx := context.Background() info := &model.TaskStatus{ - TableInfos: []*model.ProcessTableInfo{ - {ID: 1, StartTs: 100}, + Tables: map[model.TableID]model.Ts{ + 1: 100, }, } feedID := "feedid" diff --git a/cdc/model/capture.go b/cdc/model/capture.go index f09898bd9af..40f21edef45 100644 --- a/cdc/model/capture.go +++ b/cdc/model/capture.go @@ -21,8 +21,8 @@ import ( // CaptureInfo store in etcd. type CaptureInfo struct { - ID string `json:"id"` - AdvertiseAddr string `json:"address"` + ID CaptureID `json:"id"` + AdvertiseAddr string `json:"address"` } // Marshal using json.Marshal. diff --git a/cdc/model/owner.go b/cdc/model/owner.go index e3440dbbe44..abff9425bff 100644 --- a/cdc/model/owner.go +++ b/cdc/model/owner.go @@ -16,43 +16,11 @@ package model import ( "encoding/json" "fmt" + "math" "github.com/pingcap/errors" ) -// ProcessTableInfo contains the info about tables that processor need to process. -type ProcessTableInfo struct { - // The ID is physical table ID. In detail: - // 1. ID is table ID when the table is not partition table. - // 2. ID is partition ID when the table is partition table. - ID uint64 `json:"id"` - StartTs uint64 `json:"start-ts"` -} - -// TableLock is used when applying table re-assignment to a processor. -// There are two kinds of locks, P-lock and C-lock. P-lock is set by owner when -// owner removes one or more tables from one processor. C-lock is a pair to -// P-lock and set by processor to indicate that the processor has synchronized -// the checkpoint and won't synchronize the removed table any more. -type TableLock struct { - // Ts is the create timestamp of lock, it is used to pair P-lock and C-lock - Ts uint64 `json:"ts"` - // CreatorID is the lock creator ID - CreatorID string `json:"creator-id"` - // CheckpointTs is used in C-lock only, it records the table synchronization checkpoint - CheckpointTs uint64 `json:"checkpoint-ts"` -} - -// TableLockStatus for the table lock in TaskStatus -type TableLockStatus int - -// Table lock status -const ( - TableNoLock TableLockStatus = iota + 1 - TablePLock - TablePLockCommited -) - // AdminJobType represents for admin job type, both used in owner and processor type AdminJobType int @@ -110,59 +78,88 @@ func (tp *TaskPosition) Unmarshal(data []byte) error { // String implements fmt.Stringer interface. func (tp *TaskPosition) String() string { data, _ := tp.Marshal() - return string(data) + return data +} + +// TableOperation records the current information of a table migration +type TableOperation struct { + TableID TableID `json:"table_id"` + Delete bool `json:"delete"` + // if the operation is a delete operation, BoundaryTs is checkpoint ts + // if the operation is a add operation, BoundaryTs is start ts + BoundaryTs uint64 `json:"boundary_ts"` + Done bool `json:"done"` +} + +// Clone returns a deep-clone of the struct +func (o *TableOperation) Clone() *TableOperation { + if o == nil { + return nil + } + clone := *o + return &clone } // TaskStatus records the task information of a capture type TaskStatus struct { // Table information list, containing tables that processor should process, updated by ownrer, processor is read only. - // TODO change to be a map for easy update. - TableInfos []*ProcessTableInfo `json:"table-infos"` - TablePLock *TableLock `json:"table-p-lock"` - TableCLock *TableLock `json:"table-c-lock"` - AdminJobType AdminJobType `json:"admin-job-type"` - ModRevision int64 `json:"-"` + Tables map[TableID]Ts `json:"tables"` + Operation []*TableOperation `json:"operation"` + AdminJobType AdminJobType `json:"admin-job-type"` + ModRevision int64 `json:"-"` } // String implements fmt.Stringer interface. func (ts *TaskStatus) String() string { data, _ := ts.Marshal() - return string(data) + return data } // RemoveTable remove the table in TableInfos. -func (ts *TaskStatus) RemoveTable(id uint64) (*ProcessTableInfo, bool) { - for idx, table := range ts.TableInfos { - if table.ID == id { - last := ts.TableInfos[len(ts.TableInfos)-1] - removedTable := ts.TableInfos[idx] - - ts.TableInfos[idx] = last - ts.TableInfos = ts.TableInfos[:len(ts.TableInfos)-1] +func (ts *TaskStatus) RemoveTable(id TableID) (Ts, bool) { + if startTs, exist := ts.Tables[id]; exist { + delete(ts.Tables, id) + return startTs, true + } + return 0, false +} - return removedTable, true +// SomeOperationsUnapplied returns true if there are some operations not applied +func (ts *TaskStatus) SomeOperationsUnapplied() bool { + for _, o := range ts.Operation { + if !o.Done { + return true } } + return false +} - return nil, false +// AppliedTs returns a Ts which less or equal to the ts boundary of any unapplied operation +func (ts *TaskStatus) AppliedTs() Ts { + appliedTs := uint64(math.MaxUint64) + for _, o := range ts.Operation { + if !o.Done { + if appliedTs > o.BoundaryTs { + appliedTs = o.BoundaryTs + } + } + } + return appliedTs } // Snapshot takes a snapshot of `*TaskStatus` and returns a new `*ProcInfoSnap` -func (ts *TaskStatus) Snapshot(cfID ChangeFeedID, captureID CaptureID, checkpointTs uint64) *ProcInfoSnap { +func (ts *TaskStatus) Snapshot(cfID ChangeFeedID, captureID CaptureID, checkpointTs Ts) *ProcInfoSnap { snap := &ProcInfoSnap{ CfID: cfID, CaptureID: captureID, - Tables: make([]ProcessTableInfo, 0, len(ts.TableInfos)), + Tables: make(map[TableID]Ts, len(ts.Tables)), } - for _, tbl := range ts.TableInfos { + for tableID, startTs := range ts.Tables { ts := checkpointTs - if ts < tbl.StartTs { - ts = tbl.StartTs + if ts < startTs { + ts = startTs } - snap.Tables = append(snap.Tables, ProcessTableInfo{ - ID: tbl.ID, - StartTs: ts, - }) + snap.Tables[tableID] = ts } return snap } @@ -182,20 +179,16 @@ func (ts *TaskStatus) Unmarshal(data []byte) error { // Clone returns a deep-clone of the struct func (ts *TaskStatus) Clone() *TaskStatus { clone := *ts - infos := make([]*ProcessTableInfo, 0, len(ts.TableInfos)) - for _, ti := range ts.TableInfos { - c := *ti - infos = append(infos, &c) - } - clone.TableInfos = infos - if ts.TablePLock != nil { - pLock := *ts.TablePLock - clone.TablePLock = &pLock + tables := make(map[TableID]Ts, len(ts.Tables)) + for tableID, startTs := range ts.Tables { + tables[tableID] = startTs } - if ts.TableCLock != nil { - cLock := *ts.TableCLock - clone.TableCLock = &cLock + clone.Tables = tables + operation := make([]*TableOperation, 0, len(ts.Operation)) + for _, opt := range ts.Operation { + operation = append(operation, opt.Clone()) } + clone.Operation = operation return &clone } @@ -205,6 +198,15 @@ type CaptureID = string // ChangeFeedID is the type for change feed ID type ChangeFeedID = string +// TableID is the ID of the table +type TableID = int64 + +// SchemaID is the ID of the schema +type SchemaID = int64 + +// Ts is the timestamp with a logical count +type Ts = uint64 + // ProcessorsInfos maps from capture IDs to TaskStatus type ProcessorsInfos map[CaptureID]*TaskStatus @@ -272,7 +274,7 @@ func (status *ChangeFeedStatus) Unmarshal(data []byte) error { // ProcInfoSnap holds most important replication information of a processor type ProcInfoSnap struct { - CfID string `json:"changefeed-id"` - CaptureID string `json:"capture-id"` - Tables []ProcessTableInfo `json:"-"` + CfID string `json:"changefeed-id"` + CaptureID string `json:"capture-id"` + Tables map[TableID]Ts `json:"-"` } diff --git a/cdc/model/owner_test.go b/cdc/model/owner_test.go index 03c6d16c674..7270a4c915d 100644 --- a/cdc/model/owner_test.go +++ b/cdc/model/owner_test.go @@ -27,37 +27,51 @@ var _ = check.Suite(&taskStatusSuite{}) func (s *taskStatusSuite) TestShouldBeDeepCopy(c *check.C) { info := TaskStatus{ - TableInfos: []*ProcessTableInfo{ - {ID: 1}, - {ID: 2}, - {ID: 3}, + + Tables: map[TableID]Ts{ + 1: 100, + 2: 100, + 3: 100, + 4: 100, + }, + Operation: []*TableOperation{ + { + TableID: 5, Delete: true, BoundaryTs: 6, Done: true, + }, }, - TablePLock: &TableLock{Ts: 11}, + AdminJobType: AdminStop, } clone := info.Clone() assertIsSnapshot := func() { - c.Assert(clone.TableInfos, check.HasLen, 3) - for i, info := range clone.TableInfos { - c.Assert(info.ID, check.Equals, uint64(i+1)) - } - c.Assert(clone.TablePLock.Ts, check.Equals, uint64(11)) - c.Assert(clone.TableCLock, check.IsNil) + c.Assert(clone.Tables, check.DeepEquals, map[TableID]Ts{ + 1: 100, + 2: 100, + 3: 100, + 4: 100, + }) + c.Assert(clone.Operation, check.DeepEquals, []*TableOperation{ + { + TableID: 5, Delete: true, BoundaryTs: 6, Done: true, + }, + }) + c.Assert(clone.AdminJobType, check.Equals, AdminStop) } assertIsSnapshot() - info.TableInfos[2] = &ProcessTableInfo{ID: 1212} - info.TablePLock.Ts = 100 - info.TableCLock = &TableLock{Ts: 100} + info.Tables[6] = 2 + info.Operation = append(info.Operation, &TableOperation{ + TableID: 6, Delete: true, BoundaryTs: 6, Done: true, + }) assertIsSnapshot() } func (s *taskStatusSuite) TestProcSnapshot(c *check.C) { info := TaskStatus{ - TableInfos: []*ProcessTableInfo{ - {ID: 10, StartTs: 100}, + Tables: map[TableID]Ts{ + 10: 100, }, } cfID := "changefeed-1" @@ -66,7 +80,7 @@ func (s *taskStatusSuite) TestProcSnapshot(c *check.C) { c.Assert(snap.CfID, check.Equals, cfID) c.Assert(snap.CaptureID, check.Equals, captureID) c.Assert(snap.Tables, check.HasLen, 1) - c.Assert(snap.Tables[0].StartTs, check.Equals, uint64(200)) + c.Assert(snap.Tables[10], check.Equals, Ts(200)) } type removeTableSuite struct{} @@ -75,21 +89,21 @@ var _ = check.Suite(&removeTableSuite{}) func (s *removeTableSuite) TestShouldReturnRemovedTable(c *check.C) { info := TaskStatus{ - TableInfos: []*ProcessTableInfo{ - {ID: 1}, - {ID: 2}, - {ID: 3}, + Tables: map[TableID]Ts{ + 1: 100, + 2: 200, + 3: 300, + 4: 400, }, } - t, found := info.RemoveTable(2) + startTs, found := info.RemoveTable(2) c.Assert(found, check.IsTrue) - c.Assert(t.ID, check.Equals, uint64(2)) + c.Assert(startTs, check.Equals, Ts(200)) } func (s *removeTableSuite) TestShouldHandleTableNotFoundCorrectly(c *check.C) { info := TaskStatus{} - t, found := info.RemoveTable(404) + _, found := info.RemoveTable(404) c.Assert(found, check.IsFalse) - c.Assert(t, check.IsNil) } diff --git a/cdc/owner.go b/cdc/owner.go index 300859d9a67..d5049f0fcd5 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/ticdc/cdc/entry" "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/cdc/roles/storage" "github.com/pingcap/ticdc/cdc/sink" "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/util" @@ -116,11 +115,8 @@ func (o *Owner) removeCapture(info *model.CaptureInfo) { startTs = feed.status.CheckpointTs } - for _, table := range task.TableInfos { - feed.orphanTables[table.ID] = model.ProcessTableInfo{ - ID: table.ID, - StartTs: startTs, - } + for tableID := range task.Tables { + feed.orphanTables[tableID] = startTs } ctx := context.TODO() @@ -135,11 +131,11 @@ func (o *Owner) removeCapture(info *model.CaptureInfo) { } } -func (o *Owner) addOrphanTable(cid string, table model.ProcessTableInfo) { +func (o *Owner) addOrphanTable(cid model.CaptureID, tableID model.TableID, startTs model.Ts) { o.l.Lock() defer o.l.Unlock() if cf, ok := o.changeFeeds[cid]; ok { - cf.orphanTables[table.ID] = table + cf.orphanTables[tableID] = startTs } else { log.Warn("changefeed not found", zap.String("changefeed", cid)) } @@ -176,25 +172,25 @@ func (o *Owner) newChangeFeed( ddlHandler := newDDLHandler(o.pdClient, kvStore, checkpointTs) - existingTables := make(map[uint64]uint64) + existingTables := make(map[model.TableID]model.Ts) for captureID, taskStatus := range processorsInfos { var checkpointTs uint64 if pos, exist := taskPositions[captureID]; exist { checkpointTs = pos.CheckPointTs } - for _, tbl := range taskStatus.TableInfos { - if tbl.StartTs > checkpointTs { - checkpointTs = tbl.StartTs + for tableID, startTs := range taskStatus.Tables { + if startTs > checkpointTs { + checkpointTs = startTs } - existingTables[tbl.ID] = checkpointTs + existingTables[tableID] = checkpointTs } } ctx, cancel := context.WithCancel(ctx) - schemas := make(map[uint64]tableIDMap) - tables := make(map[uint64]entry.TableName) - partitions := make(map[uint64][]int64) - orphanTables := make(map[uint64]model.ProcessTableInfo) + schemas := make(map[model.SchemaID]tableIDMap) + tables := make(map[model.TableID]entry.TableName) + partitions := make(map[model.TableID][]int64) + orphanTables := make(map[model.TableID]model.Ts) for tid, table := range schemaSnap.CloneTables() { if filter.ShouldIgnoreTable(table.Schema, table.Table) { continue @@ -202,43 +198,37 @@ func (o *Owner) newChangeFeed( tables[tid] = table if ts, ok := existingTables[tid]; ok { - log.Debug("ignore known table", zap.Uint64("tid", tid), zap.Stringer("table", table), zap.Uint64("ts", ts)) + log.Debug("ignore known table", zap.Int64("tid", tid), zap.Stringer("table", table), zap.Uint64("ts", ts)) continue } - schema, ok := schemaSnap.SchemaByTableID(int64(tid)) + schema, ok := schemaSnap.SchemaByTableID(tid) if !ok { - log.Warn("schema not found for table", zap.Uint64("tid", tid)) + log.Warn("schema not found for table", zap.Int64("tid", tid)) } else { - sid := uint64(schema.ID) + sid := schema.ID if _, ok := schemas[sid]; !ok { schemas[sid] = make(tableIDMap) } schemas[sid][tid] = struct{}{} } - tblInfo, ok := schemaSnap.TableByID(int64(tid)) + tblInfo, ok := schemaSnap.TableByID(tid) if !ok { - log.Warn("table not found for table ID", zap.Uint64("tid", tid)) + log.Warn("table not found for table ID", zap.Int64("tid", tid)) continue } if pi := tblInfo.GetPartitionInfo(); pi != nil { delete(partitions, tid) for _, partition := range pi.Definitions { - id := uint64(partition.ID) + id := partition.ID if ts, ok := existingTables[id]; ok { - log.Debug("ignore known table partition", zap.Uint64("tid", tid), zap.Uint64("partitionID", id), zap.Stringer("table", table), zap.Uint64("ts", ts)) + log.Debug("ignore known table partition", zap.Int64("tid", tid), zap.Int64("partitionID", id), zap.Stringer("table", table), zap.Uint64("ts", ts)) continue } partitions[tid] = append(partitions[tid], partition.ID) - orphanTables[id] = model.ProcessTableInfo{ - ID: id, - StartTs: checkpointTs, - } + orphanTables[id] = checkpointTs } } else { - orphanTables[tid] = model.ProcessTableInfo{ - ID: tid, - StartTs: checkpointTs, - } + orphanTables[tid] = checkpointTs } } @@ -259,16 +249,15 @@ func (o *Owner) newChangeFeed( cancel() }() cf := &changeFeed{ - info: info, - id: id, - ddlHandler: ddlHandler, - schema: schemaSnap, - schemas: schemas, - tables: tables, - partitions: partitions, - orphanTables: orphanTables, - waitingConfirmTables: make(map[uint64]string), - toCleanTables: make(map[uint64]struct{}), + info: info, + id: id, + ddlHandler: ddlHandler, + schema: schemaSnap, + schemas: schemas, + tables: tables, + partitions: partitions, + orphanTables: orphanTables, + toCleanTables: make(map[model.TableID]model.Ts), status: &model.ChangeFeedStatus{ ResolvedTs: 0, CheckpointTs: checkpointTs, @@ -278,7 +267,7 @@ func (o *Owner) newChangeFeed( targetTs: info.GetTargetTs(), taskStatus: processorsInfos, taskPositions: taskPositions, - infoWriter: storage.NewOwnerTaskStatusEtcdWriter(o.etcdClient), + etcdCli: o.etcdClient, filter: filter, sink: sink, cyclicEnabled: info.Config.Cyclic.IsEnabled(), @@ -402,14 +391,15 @@ func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error { if !ok { return errors.Errorf("changefeed %s not found in owner cache", job.CfID) } - for captureID, pinfo := range cf.taskStatus { - pinfo.TablePLock = nil - pinfo.TableCLock = nil - pinfo.AdminJobType = job.Type - _, err := cf.infoWriter.Write(ctx, cf.id, captureID, pinfo, false) + for captureID := range cf.taskStatus { + newStatus, err := cf.etcdCli.AtomicPutTaskStatus(ctx, cf.id, captureID, func(taskStatus *model.TaskStatus) error { + taskStatus.AdminJobType = job.Type + return nil + }) if err != nil { return errors.Trace(err) } + cf.taskStatus[captureID] = newStatus.Clone() } // record admin job in changefeed status cf.status.AdminJobType = job.Type @@ -705,17 +695,11 @@ func (o *Owner) cleanUpStaleTasks(ctx context.Context, captures []*model.Capture zap.Reflect("task status", status), ) } - for _, table := range status.TableInfos { - var startTs uint64 + for tableID, startTs := range status.Tables { if taskPosFound { startTs = pos.CheckPointTs - } else { - startTs = table.StartTs } - o.addOrphanTable(changeFeedID, model.ProcessTableInfo{ - ID: table.ID, - StartTs: startTs, - }) + o.addOrphanTable(changeFeedID, tableID, startTs) } } diff --git a/cdc/processor.go b/cdc/processor.go index 31d2a4ecdfd..43f6e53945e 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "io" - "sort" "strconv" "strings" "sync" @@ -190,8 +189,8 @@ func newProcessor( tables: make(map[int64]*tableInfo), } - for _, table := range p.status.TableInfos { - p.addTable(ctx, int64(table.ID), table.StartTs) + for tableID, startTs := range p.status.Tables { + p.addTable(ctx, tableID, startTs) } return p, nil } @@ -384,17 +383,16 @@ func (p *processor) updateInfo(ctx context.Context) error { if err != nil { return errors.Trace(err) } - log.Debug("update task position", zap.Stringer("status", p.position)) + log.Debug("update task position", zap.Stringer("position", p.position)) return nil } - statusChanged, locked, err := p.tsRWriter.UpdateInfo(ctx) + statusChanged, err := p.tsRWriter.UpdateInfo(ctx) if err != nil { return errors.Trace(err) } - if !statusChanged && !locked { + if !statusChanged && !p.tsRWriter.GetTaskStatus().SomeOperationsUnapplied() { return updatePosition() } - oldStatus := p.status p.status = p.tsRWriter.GetTaskStatus() if p.status.AdminJobType == model.AdminStop || p.status.AdminJobType == model.AdminRemove { err = p.stop(ctx) @@ -403,11 +401,11 @@ func (p *processor) updateInfo(ctx context.Context) error { } return errors.Trace(model.ErrAdminStopProcessor) } - err = p.handleTables(ctx, oldStatus, p.status, p.position.CheckPointTs) + err = p.handleTables(ctx) if err != nil { return errors.Trace(err) } - syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Set(float64(len(p.status.TableInfos))) + syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureID).Set(float64(len(p.status.Tables))) err = updatePosition() if err != nil { return errors.Trace(err) @@ -430,46 +428,6 @@ func (p *processor) updateInfo(ctx context.Context) error { return nil } -func diffProcessTableInfos(oldInfo, newInfo []*model.ProcessTableInfo) (removed, added []*model.ProcessTableInfo) { - sort.Slice(oldInfo, func(i, j int) bool { - return oldInfo[i].ID < oldInfo[j].ID - }) - - sort.Slice(newInfo, func(i, j int) bool { - return newInfo[i].ID < newInfo[j].ID - }) - - i, j := 0, 0 - for i < len(oldInfo) && j < len(newInfo) { - if oldInfo[i].ID == newInfo[j].ID { - i++ - j++ - } else if oldInfo[i].ID < newInfo[j].ID { - removed = append(removed, oldInfo[i]) - i++ - } else { - added = append(added, newInfo[j]) - j++ - } - } - for ; i < len(oldInfo); i++ { - removed = append(removed, oldInfo[i]) - } - for ; j < len(newInfo); j++ { - added = append(added, newInfo[j]) - } - - if len(removed) > 0 || len(added) > 0 { - log.Debug("table diff", zap.Reflect("old", oldInfo), - zap.Reflect("new", newInfo), - zap.Reflect("add", added), - zap.Reflect("remove", removed), - ) - } - - return -} - func (p *processor) removeTable(tableID int64) { p.stateMu.Lock() defer p.stateMu.Unlock() @@ -491,18 +449,16 @@ func (p *processor) removeTable(tableID int64) { } // handleTables handles table scheduler on this processor, add or remove table puller -func (p *processor) handleTables( - ctx context.Context, oldInfo, newInfo *model.TaskStatus, checkpointTs uint64, -) error { - checkPairedMarkTable := func(tables []*model.ProcessTableInfo, hint string) error { +func (p *processor) handleTables(ctx context.Context) error { + checkPairedMarkTable := func(tables map[model.TableID]model.Ts) error { if p.changefeed.Config != nil && p.changefeed.Config.Cyclic.IsEnabled() { // Make sure all normal tables and mark tables are paired. schemaSnapshot := p.schemaStorage.GetLastSnapshot() tableNames := make([]model.TableName, 0, len(tables)) - for _, table := range tables { - name, ok := schemaSnapshot.GetTableNameByID(int64(table.ID)) + for tableID := range tables { + name, ok := schemaSnapshot.GetTableNameByID(tableID) if !ok { - return errors.NotFoundf("table(%d)", table.ID) + return errors.NotFoundf("table(%d)", tableID) } tableNames = append(tableNames, model.TableName{ Schema: name.Schema, @@ -510,37 +466,30 @@ func (p *processor) handleTables( }) } if !cyclic.IsTablesPaired(tableNames) { - return errors.NotValidf( - "%s normal table and mark table not match %v", hint, tableNames) + return errors.NotValidf("normal table and mark table not match %v", tableNames) } } return nil } - removedTables, addedTables := diffProcessTableInfos(oldInfo.TableInfos, newInfo.TableInfos) - // remove tables - if err := checkPairedMarkTable(removedTables, "remove table"); err != nil { + if err := checkPairedMarkTable(p.status.Tables); err != nil { return err } - for _, pinfo := range removedTables { - p.removeTable(int64(pinfo.ID)) - } - // add tables - if err := checkPairedMarkTable(removedTables, "add table"); err != nil { - return err - } - for _, pinfo := range addedTables { - p.addTable(ctx, int64(pinfo.ID), pinfo.StartTs) - } - - // write clock if need - if newInfo.TablePLock != nil && newInfo.TableCLock == nil { - newInfo.TableCLock = &model.TableLock{ - Ts: newInfo.TablePLock.Ts, - CheckpointTs: checkpointTs, + for _, opt := range p.status.Operation { + if opt.Delete { + if opt.BoundaryTs <= p.position.CheckPointTs { + p.removeTable(opt.TableID) + opt.Done = true + } + } else { + p.addTable(ctx, opt.TableID, opt.BoundaryTs) + opt.Done = true } } + if !p.status.SomeOperationsUnapplied() { + p.status.Operation = nil + } return nil } diff --git a/cdc/roles/storage/etcd.go b/cdc/roles/storage/etcd.go index 818c9972c25..0b2b87d85f0 100644 --- a/cdc/roles/storage/etcd.go +++ b/cdc/roles/storage/etcd.go @@ -15,16 +15,11 @@ package storage import ( "context" - "time" - "github.com/cenkalti/backoff" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" - "github.com/pingcap/ticdc/pkg/retry" - "github.com/pingcap/ticdc/pkg/util" - "github.com/pingcap/tidb/store/tikv/oracle" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -42,7 +37,7 @@ type ProcessorTsRWriter interface { GetTaskStatus() *model.TaskStatus // UpdateInfo update the in memory cache as taskStatus in storage. // oldInfo and newInfo is the old and new in memory cache taskStatus. - UpdateInfo(ctx context.Context) (bool, bool, error) + UpdateInfo(ctx context.Context) (bool, error) // WriteInfoIntoStorage update taskStatus into storage, return model.ErrWriteTsConflict if in last learn taskStatus is out dated and must call UpdateInfo. WriteInfoIntoStorage(ctx context.Context) error } @@ -89,17 +84,16 @@ func (rw *ProcessorTsEtcdRWriter) WritePosition(ctx context.Context, taskPositio // UpdateInfo implements ProcessorTsRWriter interface. func (rw *ProcessorTsEtcdRWriter) UpdateInfo( ctx context.Context, -) (changed bool, locked bool, err error) { +) (changed bool, err error) { modRevision, info, err := rw.etcdClient.GetTaskStatus(ctx, rw.changefeedID, rw.captureID) if err != nil { - return false, false, errors.Trace(err) + return false, errors.Trace(err) } changed = rw.modRevision != modRevision if changed { rw.taskStatus = info rw.modRevision = modRevision } - locked = rw.taskStatus.TablePLock != nil && rw.taskStatus.TableCLock == nil return } @@ -113,6 +107,7 @@ func (rw *ProcessorTsEtcdRWriter) WriteInfoIntoStorage( if err != nil { return errors.Trace(err) } + log.Info("update taskStatus", zap.Stringer("status", rw.taskStatus)) resp, err := rw.etcdClient.Client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", rw.modRevision), @@ -146,141 +141,3 @@ func (rw *ProcessorTsEtcdRWriter) GetChangeFeedStatus(ctx context.Context) (*mod func (rw *ProcessorTsEtcdRWriter) GetTaskStatus() *model.TaskStatus { return rw.taskStatus } - -// OwnerTaskStatusEtcdWriter encapsulates TaskStatus write operation -type OwnerTaskStatusEtcdWriter struct { - etcdClient kv.CDCEtcdClient -} - -// NewOwnerTaskStatusEtcdWriter returns a new `*OwnerTaskStatusEtcdWriter` instance -func NewOwnerTaskStatusEtcdWriter(cli kv.CDCEtcdClient) *OwnerTaskStatusEtcdWriter { - return &OwnerTaskStatusEtcdWriter{ - etcdClient: cli, - } -} - -// updateInfo updates the local TaskStatus with etcd value, except for TableInfos, Admin and TablePLock -func (ow *OwnerTaskStatusEtcdWriter) updateInfo( - ctx context.Context, changefeedID, captureID string, oldInfo *model.TaskStatus, -) (newInfo *model.TaskStatus, err error) { - modRevision, info, err := ow.etcdClient.GetTaskStatus(ctx, changefeedID, captureID) - if err != nil { - return - } - - // TableInfos and TablePLock is updated by owner only - newInfo = info - newInfo.TableInfos = oldInfo.TableInfos - newInfo.AdminJobType = oldInfo.AdminJobType - newInfo.TablePLock = oldInfo.TablePLock - newInfo.ModRevision = modRevision - - if newInfo.TablePLock != nil { - if newInfo.TableCLock == nil { - err = errors.Trace(model.ErrFindPLockNotCommit) - } else { - // clean lock - newInfo.TablePLock = nil - newInfo.TableCLock = nil - } - } - return -} - -// CheckLock checks whether there exists p-lock or whether p-lock is committed if it exists -func (ow *OwnerTaskStatusEtcdWriter) CheckLock( - ctx context.Context, changefeedID, captureID string, -) (status model.TableLockStatus, err error) { - _, info, err := ow.etcdClient.GetTaskStatus(ctx, changefeedID, captureID) - if err != nil { - if errors.Cause(err) == model.ErrTaskStatusNotExists { - return model.TableNoLock, nil - } - return - } - log.Info("show check lock", zap.Reflect("status", info)) - - // in most cases there is no p-lock - if info.TablePLock == nil { - status = model.TableNoLock - return - } - - if info.TableCLock != nil { - status = model.TablePLockCommited - } else { - status = model.TablePLock - } - - return -} - -// Write persists given `TaskStatus` into etcd. -// If returned err is not nil, don't use the returned newInfo as it may be not a reasonable value. -func (ow *OwnerTaskStatusEtcdWriter) Write( - ctx context.Context, - changefeedID, captureID string, - info *model.TaskStatus, - writePLock bool, -) (newInfo *model.TaskStatus, err error) { - - // check p-lock not exists or is already resolved - lockStatus, err := ow.CheckLock(ctx, changefeedID, captureID) - if err != nil { - return - } - newInfo = info - switch lockStatus { - case model.TableNoLock: - case model.TablePLockCommited: - newInfo.TablePLock = nil - newInfo.TableCLock = nil - case model.TablePLock: - err = errors.Trace(model.ErrFindPLockNotCommit) - return - } - - if writePLock { - newInfo.TablePLock = &model.TableLock{ - Ts: oracle.EncodeTSO(time.Now().UnixNano() / int64(time.Millisecond)), - CreatorID: util.CaptureIDFromCtx(ctx), - } - } - - key := kv.GetEtcdKeyTaskStatus(changefeedID, captureID) - err = retry.Run(500*time.Millisecond, 5, func() error { - value, err := newInfo.Marshal() - if err != nil { - return errors.Trace(err) - } - - resp, err := ow.etcdClient.Client.KV.Txn(ctx).If( - clientv3.Compare(clientv3.ModRevision(key), "=", newInfo.ModRevision), - ).Then( - clientv3.OpPut(key, value), - ).Commit() - - if err != nil { - return errors.Trace(err) - } - - if !resp.Succeeded { - log.Info("outdated table infos, update table and retry") - newInfo, err = ow.updateInfo(ctx, changefeedID, captureID, info) - switch errors.Cause(err) { - case model.ErrFindPLockNotCommit, model.ErrTaskStatusNotExists: - return backoff.Permanent(err) - case nil: - return errors.Trace(model.ErrWriteTaskStatusConlict) - default: - return errors.Trace(err) - } - } - - newInfo.ModRevision = resp.Header.Revision - - return nil - }) - - return -} diff --git a/cdc/roles/storage/etcd_test.go b/cdc/roles/storage/etcd_test.go index 3d816135639..7cd6b762fdd 100644 --- a/cdc/roles/storage/etcd_test.go +++ b/cdc/roles/storage/etcd_test.go @@ -20,7 +20,6 @@ import ( "time" "github.com/pingcap/check" - "github.com/pingcap/errors" "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/etcd" @@ -91,8 +90,11 @@ func (s *etcdSuite) TestProcessorTsWriter(c *check.C) { err error revision int64 info = &model.TaskStatus{ - TableInfos: []*model.ProcessTableInfo{ - {ID: 11}, {ID: 12}, + Tables: map[model.TableID]model.Ts{ + 1: 100, + 2: 200, + 3: 300, + 4: 400, }, } getInfo *model.TaskStatus @@ -131,31 +133,16 @@ func (s *etcdSuite) TestProcessorTsWriter(c *check.C) { // test table taskStatus changed, should return ErrWriteTsConflict. getInfo = info.Clone() - getInfo.TableInfos = []*model.ProcessTableInfo{{ID: 11}, {ID: 12}, {ID: 13}} + getInfo.Tables = map[model.TableID]model.Ts{ + 4: 100, + 5: 200, + 6: 300, + 7: 400, + } sinfo, err = getInfo.Marshal() c.Assert(err, check.IsNil) _, err = s.client.Client.Put(context.Background(), kv.GetEtcdKeyTaskStatus(changefeedID, captureID), sinfo) c.Assert(err, check.IsNil) - - info.TableCLock = &model.TableLock{Ts: 6} - err = rw.WriteInfoIntoStorage(context.Background()) - c.Assert(errors.Cause(err), check.Equals, model.ErrWriteTsConflict) - - changed, locked, err := rw.UpdateInfo(context.Background()) - c.Assert(err, check.IsNil) - c.Assert(changed, check.IsTrue) - c.Assert(locked, check.IsFalse) - c.Assert(rw.GetTaskStatus(), check.DeepEquals, getInfo) - info = rw.GetTaskStatus() - - // update success again. - info.TableCLock = &model.TableLock{Ts: 6} - err = rw.WriteInfoIntoStorage(context.Background()) - c.Assert(err, check.IsNil) - revision, getInfo, err = s.client.GetTaskStatus(context.Background(), changefeedID, captureID) - c.Assert(err, check.IsNil) - c.Assert(revision, check.Equals, rw.modRevision) - c.Assert(getInfo.TableCLock.Ts, check.Equals, uint64(6)) } func (s *etcdSuite) TestProcessorTsWritePos(c *check.C) { @@ -192,61 +179,3 @@ func (s *etcdSuite) TestProcessorTsReader(c *check.C) { c.Assert(err, check.IsNil) c.Assert(changedFeed, check.DeepEquals, info) } - -func (s *etcdSuite) TestOwnerTableInfoWriter(c *check.C) { - var ( - changefeedID = "test-owner-table-writer-changefeed" - captureID = "test-owner-table-writer-capture" - info = &model.TaskStatus{} - err error - ) - - ow := NewOwnerTaskStatusEtcdWriter(s.client) - - // owner adds table to processor - info.TableInfos = append(info.TableInfos, &model.ProcessTableInfo{ID: 50, StartTs: 100}) - info, err = ow.Write(context.Background(), changefeedID, captureID, info, false) - c.Assert(err, check.IsNil) - c.Assert(info.TableInfos, check.HasLen, 1) - - // owner adds table to processor when remote data is updated - info.TableInfos = append(info.TableInfos, &model.ProcessTableInfo{ID: 52, StartTs: 100}) - info, err = ow.Write(context.Background(), changefeedID, captureID, info, false) - c.Assert(err, check.IsNil) - c.Assert(info.TableInfos, check.HasLen, 2) - // check ModRevision after write - revision, _, err := s.client.GetTaskStatus(context.Background(), changefeedID, captureID) - c.Assert(err, check.IsNil) - c.Assert(info.ModRevision, check.Equals, revision) - - // owner removes table from processor - info.TableInfos = info.TableInfos[:len(info.TableInfos)-1] - info, err = ow.Write(context.Background(), changefeedID, captureID, info, true) - c.Assert(err, check.IsNil) - c.Assert(info.TableInfos, check.HasLen, 1) - c.Assert(info.TablePLock, check.NotNil) - - // owner can't add table when plock is not resolved - info.TableInfos = append(info.TableInfos, &model.ProcessTableInfo{ID: 52, StartTs: 100}) - info, err = ow.Write(context.Background(), changefeedID, captureID, info, false) - c.Assert(errors.Cause(err), check.Equals, model.ErrFindPLockNotCommit) - c.Assert(info.TableInfos, check.HasLen, 2) - - // owner can't remove table when plock is not resolved - info.TableInfos = info.TableInfos[:0] - info, err = ow.Write(context.Background(), changefeedID, captureID, info, true) - c.Assert(errors.Cause(err), check.Equals, model.ErrFindPLockNotCommit) - c.Assert(info.TableInfos, check.HasLen, 0) - - // simulate processor removes table and commit table p-lock - info.TableCLock = &model.TableLock{Ts: info.TablePLock.Ts, CheckpointTs: 200} - err = s.client.PutTaskStatus(context.Background(), changefeedID, captureID, info) - c.Assert(err, check.IsNil) - info.TableCLock = nil - - // owner adds table to processor again - info.TableInfos = append(info.TableInfos, &model.ProcessTableInfo{ID: 54, StartTs: 300}) - info, err = ow.Write(context.Background(), changefeedID, captureID, info, false) - c.Assert(err, check.IsNil) - c.Assert(info.TableInfos, check.HasLen, 1) -} diff --git a/pkg/notify/notify_test.go b/pkg/notify/notify_test.go index 269f7b69fb8..edf8e1ae459 100644 --- a/pkg/notify/notify_test.go +++ b/pkg/notify/notify_test.go @@ -1,13 +1,16 @@ package notify import ( + "context" "testing" "time" "github.com/pingcap/check" ) -func Test(t *testing.T) { check.TestingT(t) } +func Test(t *testing.T) { + check.TestingT(t) +} type notifySuite struct{} @@ -44,16 +47,38 @@ func (s *notifySuite) TestNotifyHub(c *check.C) { } func (s *notifySuite) TestContinusStop(c *check.C) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() notifier := new(Notifier) - n := 5000 + go func() { + for { + select { + case <-ctx.Done(): + return + default: + } + notifier.Notify() + } + }() + n := 50 receivers := make([]*Receiver, n) for i := 0; i < n; i++ { receivers[i] = notifier.NewReceiver(10 * time.Millisecond) } for i := 0; i < n; i++ { - <-receivers[i].C + i := i + go func() { + for { + select { + case <-ctx.Done(): + return + case <-receivers[i].C: + } + } + }() } for i := 0; i < n; i++ { receivers[i].Stop() } + <-ctx.Done() } diff --git a/tests/_utils/run_cdc_server b/tests/_utils/run_cdc_server index 96c48c62790..28fdf73b9d9 100755 --- a/tests/_utils/run_cdc_server +++ b/tests/_utils/run_cdc_server @@ -51,5 +51,5 @@ done echo "[$(date)] <<<<<< START cdc server in $TEST_NAME case >>>>>>" cd $workdir pid=$(ps -C run_cdc_server -o pid=|tr -d '[:space:]') -$binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server --log-file $workdir/cdc$logsuffix.log --log-level debug $addr $pd_addr >> $workdir/stdout$log_suffix.log 2>&1 & +$binary -test.coverprofile="$OUT_DIR/cov.$TEST_NAME.$pid.out" server --log-file $workdir/cdc$logsuffix.log --log-level info $addr $pd_addr >> $workdir/stdout$log_suffix.log 2>&1 & cd $pwd diff --git a/tests/_utils/start_tidb_cluster b/tests/_utils/start_tidb_cluster index 99781c7ec00..8dbeae33aa2 100755 --- a/tests/_utils/start_tidb_cluster +++ b/tests/_utils/start_tidb_cluster @@ -131,28 +131,28 @@ echo "Verifying Upstream TiDB is started..." i=0 while ! mysql -uroot -h${UP_TIDB_HOST} -P${UP_TIDB_PORT} --default-character-set utf8mb4 -e 'select * from mysql.tidb;'; do i=$((i + 1)) - if [ "$i" -gt 40 ]; then + if [ "$i" -gt 60 ]; then echo 'Failed to start upstream TiDB' exit 2 fi - sleep 1 + sleep 2 done i=0 while ! mysql -uroot -h${UP_TIDB_HOST} -P${UP_TIDB_OTHER_PORT} --default-character-set utf8mb4 -e 'select * from mysql.tidb;'; do i=$((i + 1)) - if [ "$i" -gt 40 ]; then + if [ "$i" -gt 60 ]; then echo 'Failed to start upstream TiDB' exit 2 fi - sleep 1 + sleep 2 done echo "Verifying Downstream TiDB is started..." i=0 while ! mysql -uroot -h${DOWN_TIDB_HOST} -P${DOWN_TIDB_PORT} --default-character-set utf8mb4 -e 'select * from mysql.tidb;'; do i=$((i + 1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 60 ]; then echo 'Failed to start downstream TiDB' exit 1 fi diff --git a/tests/availability/owner.sh b/tests/availability/owner.sh index e8982c6d26a..158a42c0e57 100755 --- a/tests/availability/owner.sh +++ b/tests/availability/owner.sh @@ -25,7 +25,7 @@ function test_owner_ha() { function test_kill_owner() { echo "run test case test_kill_owner" # start a capture server - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_kill_owner.server1 # ensure the server become the owner ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'" owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') @@ -34,7 +34,7 @@ function test_kill_owner() { echo "owner id" $owner_id # run another server - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "0.0.0.0:8301" + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "0.0.0.0:8301" --logsuffix test_kill_owner.server2 ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep -v \"$owner_id\" | grep id" capture_id=$($CDC_BINARY cli capture list 2>&1 | awk -F '"' '/id/{print $4}' | grep -v "$owner_id") echo "capture_id:" $capture_id @@ -55,7 +55,7 @@ function test_kill_owner() { function test_hang_up_owner() { echo "run test case test_hang_up_owner" - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_hang_up_owner.server1 # ensure the server become the owner ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'" @@ -65,7 +65,7 @@ function test_hang_up_owner() { echo "owner id" $owner_id # run another server - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "0.0.0.0:8301" + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "0.0.0.0:8301" --logsuffix test_hang_up_owner.server2 ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep -v \"$owner_id\" | grep id" capture_id=$($CDC_BINARY cli capture list 2>&1 | awk -F '"' '/id/{print $4}' | grep -v "$owner_id") echo "capture_id:" $capture_id @@ -91,7 +91,7 @@ function test_hang_up_owner() { function test_expire_owner() { echo "run test case test_expire_owner" - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_expire_owner.server1 # ensure the server become the owner ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'" @@ -121,7 +121,7 @@ function test_owner_cleanup_stale_tasks() { echo "run test case test_owner_cleanup_stale_tasks" # start a capture server - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_cleanup_stale_tasks.server1 # ensure the server become the owner ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'" owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') @@ -130,7 +130,7 @@ function test_owner_cleanup_stale_tasks() { echo "owner id" $owner_id # run another server - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "0.0.0.0:8301" + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "0.0.0.0:8301" --logsuffix test_owner_cleanup_stale_tasks.server2 ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep -v \"$owner_id\" | grep id" capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}' | grep -v "$owner_pid") capture_id=$($CDC_BINARY cli capture list 2>&1 | awk -F '"' '/id/{print $4}' | grep -v "$owner_id") @@ -143,7 +143,7 @@ function test_owner_cleanup_stale_tasks() { # simulate task status is deleted but task position stales etcdctl del /tidb/cdc/task/status --prefix - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "0.0.0.0:8302" + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "0.0.0.0:8302" --logsuffix test_owner_cleanup_stale_tasks.server3 ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'" run_sql "INSERT INTO test.availability1(id, val) VALUES (1, 1);" @@ -164,7 +164,7 @@ function test_owner_retryable_error() { export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/capture-campaign-compacted-error=1*return(true)' # start a capture server - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix server1 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_retryable_error.server1 # ensure the server become the owner ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'" @@ -176,7 +176,7 @@ function test_owner_retryable_error() { export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner-run-with-error=1*return(true);github.com/pingcap/ticdc/cdc/capture-resign-failed=1*return(true)' # run another server - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix server2 --addr "0.0.0.0:8301" + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_retryable_error.server2 --addr "0.0.0.0:8301" ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep -v \"$owner_id\" | grep id" capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}' | grep -v "$owner_pid") capture_id=$($CDC_BINARY cli capture list 2>&1 | awk -F '"' '/id/{print $4}' | grep -v "$owner_id") @@ -201,7 +201,7 @@ function test_gap_between_watch_capture() { export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sleep-before-watch-capture=1*sleep(6000)' # start a capture server - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_gap_between_watch_capture.server1 # ensure the server become the owner ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'" owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') @@ -210,7 +210,7 @@ function test_gap_between_watch_capture() { echo "owner id" $owner_id # run another server - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "0.0.0.0:8301" + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "0.0.0.0:8301" --logsuffix test_gap_between_watch_capture.server2 ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep -v \"$owner_id\" | grep id" capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}' | grep -v "$owner_pid") capture_id=$($CDC_BINARY cli capture list 2>&1 | awk -F '"' '/id/{print $4}' | grep -v "$owner_id") diff --git a/tests/cyclic_ab/conf/diff_config.toml b/tests/cyclic_ab/conf/diff_config.toml index c85677b22a5..2f08bcc7d2c 100644 --- a/tests/cyclic_ab/conf/diff_config.toml +++ b/tests/cyclic_ab/conf/diff_config.toml @@ -5,7 +5,7 @@ chunk-size = 10 check-thread-count = 4 sample-percent = 100 use-rowid = false -use-checksum = true +use-checksum = false fix-sql-file = "fix.sql" # tables need to check. diff --git a/tests/resolve_lock/main.go b/tests/resolve_lock/main.go index 0e8c258ff6a..405d470d05e 100644 --- a/tests/resolve_lock/main.go +++ b/tests/resolve_lock/main.go @@ -51,7 +51,7 @@ func main() { if err := prepare(sourceDB); err != nil { log.S().Fatal(err) } - if err := addLock(cfg); err != nil { + if err := addLock(cfg); err != nil && errors.Cause(err) != context.Canceled && errors.Cause(err) != context.DeadlineExceeded { log.S().Fatal(err) } time.Sleep(5 * time.Second) @@ -98,10 +98,11 @@ func addLock(cfg *util.Config) error { } pdcli, err := pd.NewClientWithContext( - ctx, strings.Split(cfg.PDAddr, ","), pd.SecurityOption{}) + context.Background(), strings.Split(cfg.PDAddr, ","), pd.SecurityOption{}) if err != nil { return errors.Trace(err) } + defer pdcli.Close() driver := tikv.Driver{} store, err := driver.Open(fmt.Sprintf("tikv://%s?disableGC=true", cfg.PDAddr))