Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: a new table migration mechanism #572

Merged
merged 4 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 146 additions & 182 deletions cdc/changefeed.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,11 +664,11 @@ func (s *schemaSnapshot) handleDDL(job *timodel.Job) error {
}

// CloneTables return a clone of the existing tables.
func (s *schemaSnapshot) CloneTables() map[uint64]TableName {
mp := make(map[uint64]TableName, len(s.tables))
func (s *schemaSnapshot) CloneTables() map[model.TableID]TableName {
mp := make(map[model.TableID]TableName, len(s.tables))

for id, table := range s.tables {
mp[uint64(id)] = table.TableName
mp[id] = table.TableName
}

return mp
Expand Down
66 changes: 63 additions & 3 deletions cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ package kv
import (
"context"
"fmt"
"time"

"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/embed"

"github.com/cenkalti/backoff"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/retry"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/mvcc/mvccpb"
)

Expand Down Expand Up @@ -332,6 +335,63 @@ func (c CDCEtcdClient) PutTaskStatus(
return nil
}

// AtomicPutTaskStatus puts task status into etcd atomically.
func (c CDCEtcdClient) AtomicPutTaskStatus(
ctx context.Context,
changefeedID string,
captureID string,
update func(*model.TaskStatus) error,
) (*model.TaskStatus, error) {
var status *model.TaskStatus
err := retry.Run(100*time.Millisecond, 3, func() error {
select {
case <-ctx.Done():
return backoff.Permanent(ctx.Err())
default:
}
var modRevision int64
var err error
modRevision, status, err = c.GetTaskStatus(ctx, changefeedID, captureID)
key := GetEtcdKeyTaskStatus(changefeedID, captureID)
var writeCmp clientv3.Cmp
switch errors.Cause(err) {
zier-one marked this conversation as resolved.
Show resolved Hide resolved
case model.ErrTaskStatusNotExists:
status = new(model.TaskStatus)
writeCmp = clientv3.Compare(clientv3.ModRevision(key), "=", 0)
case nil:
writeCmp = clientv3.Compare(clientv3.ModRevision(key), "=", modRevision)
default:
return errors.Trace(err)
}
err = update(status)
if err != nil {
return errors.Trace(err)
}
value, err := status.Marshal()
if err != nil {
return errors.Trace(err)
}

resp, err := c.Client.KV.Txn(ctx).If(writeCmp).Then(
clientv3.OpPut(key, value),
).Commit()

if err != nil {
return errors.Trace(err)
}

if !resp.Succeeded {
log.Info("outdated table infos, ignore update taskStatus")
return errors.Annotatef(model.ErrWriteTsConflict, "key: %s", key)
}
return nil
})
if err != nil {
return nil, errors.Trace(err)
}
return status, nil
}

// GetTaskPosition queries task process from etcd, returns
// - ModRevision of the given key
// - *model.TaskPosition unmarshaled from the value
Expand Down
8 changes: 4 additions & 4 deletions cdc/kv/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func (s *etcdSuite) TestGetChangeFeeds(c *check.C) {
func (s *etcdSuite) TestGetPutTaskStatus(c *check.C) {
ctx := context.Background()
info := &model.TaskStatus{
TableInfos: []*model.ProcessTableInfo{
{ID: 1, StartTs: 100},
Tables: map[model.TableID]model.Ts{
1: 100,
},
}

Expand All @@ -126,8 +126,8 @@ func (s *etcdSuite) TestGetPutTaskStatus(c *check.C) {
func (s *etcdSuite) TestDeleteTaskStatus(c *check.C) {
ctx := context.Background()
info := &model.TaskStatus{
TableInfos: []*model.ProcessTableInfo{
{ID: 1, StartTs: 100},
Tables: map[model.TableID]model.Ts{
1: 100,
},
}
feedID := "feedid"
Expand Down
4 changes: 2 additions & 2 deletions cdc/model/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

// CaptureInfo store in etcd.
type CaptureInfo struct {
ID string `json:"id"`
AdvertiseAddr string `json:"address"`
ID CaptureID `json:"id"`
AdvertiseAddr string `json:"address"`
}

// Marshal using json.Marshal.
Expand Down
152 changes: 77 additions & 75 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,11 @@ package model
import (
"encoding/json"
"fmt"
"math"

"github.com/pingcap/errors"
)

// ProcessTableInfo contains the info about tables that processor need to process.
type ProcessTableInfo struct {
// The ID is physical table ID. In detail:
// 1. ID is table ID when the table is not partition table.
// 2. ID is partition ID when the table is partition table.
ID uint64 `json:"id"`
StartTs uint64 `json:"start-ts"`
}

// TableLock is used when applying table re-assignment to a processor.
// There are two kinds of locks, P-lock and C-lock. P-lock is set by owner when
// owner removes one or more tables from one processor. C-lock is a pair to
// P-lock and set by processor to indicate that the processor has synchronized
// the checkpoint and won't synchronize the removed table any more.
type TableLock struct {
// Ts is the create timestamp of lock, it is used to pair P-lock and C-lock
Ts uint64 `json:"ts"`
// CreatorID is the lock creator ID
CreatorID string `json:"creator-id"`
// CheckpointTs is used in C-lock only, it records the table synchronization checkpoint
CheckpointTs uint64 `json:"checkpoint-ts"`
}

// TableLockStatus for the table lock in TaskStatus
type TableLockStatus int

// Table lock status
const (
TableNoLock TableLockStatus = iota + 1
TablePLock
TablePLockCommited
)

// AdminJobType represents for admin job type, both used in owner and processor
type AdminJobType int

Expand Down Expand Up @@ -110,59 +78,88 @@ func (tp *TaskPosition) Unmarshal(data []byte) error {
// String implements fmt.Stringer interface.
func (tp *TaskPosition) String() string {
data, _ := tp.Marshal()
return string(data)
return data
}

// TableOperation records the current information of a table migration
type TableOperation struct {
TableID TableID `json:"table_id"`
Delete bool `json:"delete"`
// if the operation is a delete operation, BoundaryTs is checkpoint ts
// if the operation is a add operation, BoundaryTs is start ts
BoundaryTs uint64 `json:"boundary_ts"`
Done bool `json:"done"`
}

// Clone returns a deep-clone of the struct
func (o *TableOperation) Clone() *TableOperation {
if o == nil {
return nil
}
clone := *o
return &clone
}

// TaskStatus records the task information of a capture
type TaskStatus struct {
// Table information list, containing tables that processor should process, updated by ownrer, processor is read only.
// TODO change to be a map for easy update.
TableInfos []*ProcessTableInfo `json:"table-infos"`
TablePLock *TableLock `json:"table-p-lock"`
TableCLock *TableLock `json:"table-c-lock"`
AdminJobType AdminJobType `json:"admin-job-type"`
ModRevision int64 `json:"-"`
Tables map[TableID]Ts `json:"tables"`
Operation []*TableOperation `json:"operation"`
AdminJobType AdminJobType `json:"admin-job-type"`
ModRevision int64 `json:"-"`
}

// String implements fmt.Stringer interface.
func (ts *TaskStatus) String() string {
data, _ := ts.Marshal()
return string(data)
return data
}

// RemoveTable remove the table in TableInfos.
func (ts *TaskStatus) RemoveTable(id uint64) (*ProcessTableInfo, bool) {
for idx, table := range ts.TableInfos {
if table.ID == id {
last := ts.TableInfos[len(ts.TableInfos)-1]
removedTable := ts.TableInfos[idx]

ts.TableInfos[idx] = last
ts.TableInfos = ts.TableInfos[:len(ts.TableInfos)-1]
func (ts *TaskStatus) RemoveTable(id TableID) (Ts, bool) {
if startTs, exist := ts.Tables[id]; exist {
delete(ts.Tables, id)
return startTs, true
}
return 0, false
}

return removedTable, true
// SomeOperationsUnapplied returns true if there are some operations not applied
func (ts *TaskStatus) SomeOperationsUnapplied() bool {
for _, o := range ts.Operation {
if !o.Done {
return true
}
}
return false
}

return nil, false
// AppliedTs returns a Ts which less or equal to the ts boundary of any unapplied operation
func (ts *TaskStatus) AppliedTs() Ts {
appliedTs := uint64(math.MaxUint64)
for _, o := range ts.Operation {
if !o.Done {
if appliedTs > o.BoundaryTs {
appliedTs = o.BoundaryTs
}
}
}
return appliedTs
}

// Snapshot takes a snapshot of `*TaskStatus` and returns a new `*ProcInfoSnap`
func (ts *TaskStatus) Snapshot(cfID ChangeFeedID, captureID CaptureID, checkpointTs uint64) *ProcInfoSnap {
func (ts *TaskStatus) Snapshot(cfID ChangeFeedID, captureID CaptureID, checkpointTs Ts) *ProcInfoSnap {
snap := &ProcInfoSnap{
CfID: cfID,
CaptureID: captureID,
Tables: make([]ProcessTableInfo, 0, len(ts.TableInfos)),
Tables: make(map[TableID]Ts, len(ts.Tables)),
}
for _, tbl := range ts.TableInfos {
for tableID, startTs := range ts.Tables {
ts := checkpointTs
if ts < tbl.StartTs {
ts = tbl.StartTs
if ts < startTs {
ts = startTs
}
snap.Tables = append(snap.Tables, ProcessTableInfo{
ID: tbl.ID,
StartTs: ts,
})
snap.Tables[tableID] = ts
}
return snap
}
Expand All @@ -182,20 +179,16 @@ func (ts *TaskStatus) Unmarshal(data []byte) error {
// Clone returns a deep-clone of the struct
func (ts *TaskStatus) Clone() *TaskStatus {
clone := *ts
infos := make([]*ProcessTableInfo, 0, len(ts.TableInfos))
for _, ti := range ts.TableInfos {
c := *ti
infos = append(infos, &c)
}
clone.TableInfos = infos
if ts.TablePLock != nil {
pLock := *ts.TablePLock
clone.TablePLock = &pLock
tables := make(map[TableID]Ts, len(ts.Tables))
for tableID, startTs := range ts.Tables {
tables[tableID] = startTs
}
if ts.TableCLock != nil {
cLock := *ts.TableCLock
clone.TableCLock = &cLock
clone.Tables = tables
operation := make([]*TableOperation, 0, len(ts.Operation))
for _, opt := range ts.Operation {
operation = append(operation, opt.Clone())
}
clone.Operation = operation
return &clone
}

Expand All @@ -205,6 +198,15 @@ type CaptureID = string
// ChangeFeedID is the type for change feed ID
type ChangeFeedID = string

// TableID is the ID of the table
type TableID = int64

// SchemaID is the ID of the schema
type SchemaID = int64

// Ts is the timestamp with a logical count
type Ts = uint64

// ProcessorsInfos maps from capture IDs to TaskStatus
type ProcessorsInfos map[CaptureID]*TaskStatus

Expand Down Expand Up @@ -272,7 +274,7 @@ func (status *ChangeFeedStatus) Unmarshal(data []byte) error {

// ProcInfoSnap holds most important replication information of a processor
type ProcInfoSnap struct {
CfID string `json:"changefeed-id"`
CaptureID string `json:"capture-id"`
Tables []ProcessTableInfo `json:"-"`
CfID string `json:"changefeed-id"`
CaptureID string `json:"capture-id"`
Tables map[TableID]Ts `json:"-"`
}
Loading