Skip to content

Commit

Permalink
a new table migration mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro committed May 18, 2020
1 parent 64df3f2 commit bf26ac6
Show file tree
Hide file tree
Showing 16 changed files with 458 additions and 677 deletions.
331 changes: 148 additions & 183 deletions cdc/changefeed.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,11 +664,11 @@ func (s *schemaSnapshot) handleDDL(job *timodel.Job) error {
}

// CloneTables return a clone of the existing tables.
func (s *schemaSnapshot) CloneTables() map[uint64]TableName {
mp := make(map[uint64]TableName, len(s.tables))
func (s *schemaSnapshot) CloneTables() map[model.TableID]TableName {
mp := make(map[model.TableID]TableName, len(s.tables))

for id, table := range s.tables {
mp[uint64(id)] = table.TableName
mp[id] = table.TableName
}

return mp
Expand Down
55 changes: 55 additions & 0 deletions cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package kv
import (
"context"
"fmt"
"time"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/retry"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/embed"

Expand Down Expand Up @@ -332,6 +335,58 @@ func (c CDCEtcdClient) PutTaskStatus(
return nil
}

// AtomicPutTaskStatus puts task status into etcd atomically.
func (c CDCEtcdClient) AtomicPutTaskStatus(
ctx context.Context,
changefeedID string,
captureID string,
update func(*model.TaskStatus) error,
) (*model.TaskStatus, error) {
var status *model.TaskStatus
err := retry.Run(100*time.Millisecond, 3, func() error {
var modRevision int64
var err error
modRevision, status, err = c.GetTaskStatus(ctx, changefeedID, captureID)
key := GetEtcdKeyTaskStatus(changefeedID, captureID)
var writeCmp clientv3.Cmp
switch errors.Cause(err) {
case model.ErrTaskStatusNotExists:
status = new(model.TaskStatus)
writeCmp = clientv3.Compare(clientv3.ModRevision(key), "=", 0)
case nil:
writeCmp = clientv3.Compare(clientv3.ModRevision(key), "=", modRevision)
default:
return errors.Trace(err)
}
err = update(status)
if err != nil {
return errors.Trace(err)
}
value, err := status.Marshal()
if err != nil {
return errors.Trace(err)
}

resp, err := c.Client.KV.Txn(ctx).If(writeCmp).Then(
clientv3.OpPut(key, value),
).Commit()

if err != nil {
return errors.Trace(err)
}

if !resp.Succeeded {
log.Info("outdated table infos, ignore update taskStatus")
return errors.Annotatef(model.ErrWriteTsConflict, "key: %s", key)
}
return nil
})
if err != nil {
return nil, errors.Trace(err)
}
return status, nil
}

// GetTaskPosition queries task process from etcd, returns
// - ModRevision of the given key
// - *model.TaskPosition unmarshaled from the value
Expand Down
8 changes: 4 additions & 4 deletions cdc/kv/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func (s *etcdSuite) TestGetChangeFeeds(c *check.C) {
func (s *etcdSuite) TestGetPutTaskStatus(c *check.C) {
ctx := context.Background()
info := &model.TaskStatus{
TableInfos: []*model.ProcessTableInfo{
{ID: 1, StartTs: 100},
Tables: map[model.TableID]model.Ts{
1: 100,
},
}

Expand All @@ -126,8 +126,8 @@ func (s *etcdSuite) TestGetPutTaskStatus(c *check.C) {
func (s *etcdSuite) TestDeleteTaskStatus(c *check.C) {
ctx := context.Background()
info := &model.TaskStatus{
TableInfos: []*model.ProcessTableInfo{
{ID: 1, StartTs: 100},
Tables: map[model.TableID]model.Ts{
1: 100,
},
}
feedID := "feedid"
Expand Down
4 changes: 2 additions & 2 deletions cdc/model/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

// CaptureInfo store in etcd.
type CaptureInfo struct {
ID string `json:"id"`
AdvertiseAddr string `json:"address"`
ID CaptureID `json:"id"`
AdvertiseAddr string `json:"address"`
}

// Marshal using json.Marshal.
Expand Down
152 changes: 77 additions & 75 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,11 @@ package model
import (
"encoding/json"
"fmt"
"math"

"github.com/pingcap/errors"
)

// ProcessTableInfo contains the info about tables that processor need to process.
type ProcessTableInfo struct {
// The ID is physical table ID. In detail:
// 1. ID is table ID when the table is not partition table.
// 2. ID is partition ID when the table is partition table.
ID uint64 `json:"id"`
StartTs uint64 `json:"start-ts"`
}

// TableLock is used when applying table re-assignment to a processor.
// There are two kinds of locks, P-lock and C-lock. P-lock is set by owner when
// owner removes one or more tables from one processor. C-lock is a pair to
// P-lock and set by processor to indicate that the processor has synchronized
// the checkpoint and won't synchronize the removed table any more.
type TableLock struct {
// Ts is the create timestamp of lock, it is used to pair P-lock and C-lock
Ts uint64 `json:"ts"`
// CreatorID is the lock creator ID
CreatorID string `json:"creator-id"`
// CheckpointTs is used in C-lock only, it records the table synchronization checkpoint
CheckpointTs uint64 `json:"checkpoint-ts"`
}

// TableLockStatus for the table lock in TaskStatus
type TableLockStatus int

// Table lock status
const (
TableNoLock TableLockStatus = iota + 1
TablePLock
TablePLockCommited
)

// AdminJobType represents for admin job type, both used in owner and processor
type AdminJobType int

Expand Down Expand Up @@ -110,59 +78,88 @@ func (tp *TaskPosition) Unmarshal(data []byte) error {
// String implements fmt.Stringer interface.
func (tp *TaskPosition) String() string {
data, _ := tp.Marshal()
return string(data)
return data
}

// TableOperation records the current information of a table migration
type TableOperation struct {
TableID TableID `json:"table_id"`
Delete bool `json:"delete"`
// if the operation is a delete operation, BoundaryTs is checkpoint ts
// if the operation is a add operation, BoundaryTs is start ts
BoundaryTs uint64 `json:"boundary_ts"`
Done bool `json:"done"`
}

// Clone returns a deep-clone of the struct
func (o *TableOperation) Clone() *TableOperation {
if o == nil {
return nil
}
clone := *o
return &clone
}

// TaskStatus records the task information of a capture
type TaskStatus struct {
// Table information list, containing tables that processor should process, updated by ownrer, processor is read only.
// TODO change to be a map for easy update.
TableInfos []*ProcessTableInfo `json:"table-infos"`
TablePLock *TableLock `json:"table-p-lock"`
TableCLock *TableLock `json:"table-c-lock"`
AdminJobType AdminJobType `json:"admin-job-type"`
ModRevision int64 `json:"-"`
Tables map[TableID]Ts `json:"tables"`
Operation []*TableOperation `json:"operation"`
AdminJobType AdminJobType `json:"admin-job-type"`
ModRevision int64 `json:"-"`
}

// String implements fmt.Stringer interface.
func (ts *TaskStatus) String() string {
data, _ := ts.Marshal()
return string(data)
return data
}

// RemoveTable remove the table in TableInfos.
func (ts *TaskStatus) RemoveTable(id uint64) (*ProcessTableInfo, bool) {
for idx, table := range ts.TableInfos {
if table.ID == id {
last := ts.TableInfos[len(ts.TableInfos)-1]
removedTable := ts.TableInfos[idx]

ts.TableInfos[idx] = last
ts.TableInfos = ts.TableInfos[:len(ts.TableInfos)-1]
func (ts *TaskStatus) RemoveTable(id TableID) (Ts, bool) {
if startTs, exist := ts.Tables[id]; exist {
delete(ts.Tables, id)
return startTs, true
}
return 0, false
}

return removedTable, true
// SomeOperationsUnapplied returns true if there are some operations not applied
func (ts *TaskStatus) SomeOperationsUnapplied() bool {
for _, o := range ts.Operation {
if !o.Done {
return true
}
}
return false
}

return nil, false
// AppliedTs returns a Ts which less or equal to the ts boundary of any unapplied operation
func (ts *TaskStatus) AppliedTs() Ts {
appliedTs := uint64(math.MaxUint64)
for _, o := range ts.Operation {
if !o.Done {
if appliedTs > o.BoundaryTs {
appliedTs = o.BoundaryTs
}
}
}
return appliedTs
}

// Snapshot takes a snapshot of `*TaskStatus` and returns a new `*ProcInfoSnap`
func (ts *TaskStatus) Snapshot(cfID ChangeFeedID, captureID CaptureID, checkpointTs uint64) *ProcInfoSnap {
func (ts *TaskStatus) Snapshot(cfID ChangeFeedID, captureID CaptureID, checkpointTs Ts) *ProcInfoSnap {
snap := &ProcInfoSnap{
CfID: cfID,
CaptureID: captureID,
Tables: make([]ProcessTableInfo, 0, len(ts.TableInfos)),
Tables: make(map[TableID]Ts, len(ts.Tables)),
}
for _, tbl := range ts.TableInfos {
for tableID, startTs := range ts.Tables {
ts := checkpointTs
if ts < tbl.StartTs {
ts = tbl.StartTs
if ts < startTs {
ts = startTs
}
snap.Tables = append(snap.Tables, ProcessTableInfo{
ID: tbl.ID,
StartTs: ts,
})
snap.Tables[tableID] = ts
}
return snap
}
Expand All @@ -182,20 +179,16 @@ func (ts *TaskStatus) Unmarshal(data []byte) error {
// Clone returns a deep-clone of the struct
func (ts *TaskStatus) Clone() *TaskStatus {
clone := *ts
infos := make([]*ProcessTableInfo, 0, len(ts.TableInfos))
for _, ti := range ts.TableInfos {
c := *ti
infos = append(infos, &c)
}
clone.TableInfos = infos
if ts.TablePLock != nil {
pLock := *ts.TablePLock
clone.TablePLock = &pLock
tables := make(map[TableID]Ts, len(ts.Tables))
for tableID, startTs := range ts.Tables {
tables[tableID] = startTs
}
if ts.TableCLock != nil {
cLock := *ts.TableCLock
clone.TableCLock = &cLock
clone.Tables = tables
operation := make([]*TableOperation, 0, len(ts.Operation))
for _, opt := range ts.Operation {
operation = append(operation, opt.Clone())
}
clone.Operation = operation
return &clone
}

Expand All @@ -205,6 +198,15 @@ type CaptureID = string
// ChangeFeedID is the type for change feed ID
type ChangeFeedID = string

// TableID is the ID of the table
type TableID = int64

// SchemaID is the ID of the schema
type SchemaID = int64

// Ts is the timestamp with a logical count
type Ts = uint64

// ProcessorsInfos maps from capture IDs to TaskStatus
type ProcessorsInfos map[CaptureID]*TaskStatus

Expand Down Expand Up @@ -272,7 +274,7 @@ func (status *ChangeFeedStatus) Unmarshal(data []byte) error {

// ProcInfoSnap holds most important replication information of a processor
type ProcInfoSnap struct {
CfID string `json:"changefeed-id"`
CaptureID string `json:"capture-id"`
Tables []ProcessTableInfo `json:"-"`
CfID string `json:"changefeed-id"`
CaptureID string `json:"capture-id"`
Tables map[TableID]Ts `json:"-"`
}
Loading

0 comments on commit bf26ac6

Please sign in to comment.