Skip to content

Commit

Permalink
Merge remote-tracking branch 'pingcap/master' into feat-branch-reorg-…
Browse files Browse the repository at this point in the history
…part-master-merge
  • Loading branch information
mjonss committed Feb 10, 2023
2 parents a608929 + 92a82ad commit 6148f3e
Show file tree
Hide file tree
Showing 34 changed files with 443 additions and 238 deletions.
8 changes: 4 additions & 4 deletions br/pkg/storage/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ func TestCreateStorage(t *testing.T) {
require.Equal(t, "TestKey", s3.SseKmsKeyId)

// special character in access keys
s, err = ParseBackend(`s3://bucket4/prefix/path?access-key=NXN7IPIOSAAKDEEOLMAF&secret-access-key=nREY/7Dt+PaIbYKrKlEEMMF/ExCiJEX=XMLPUANw&session-token=FQoDYXdzEPP//////////wEaDPv5GPAhRW8pw6/nsiKsAZu7sZDCXPtEBEurxmvyV1r+nWy1I4VPbdIJV+iDnotwS3PKIyj+yDnOeigMf2yp9y2Dg9D7r51vWUyUQQfceZi9/8Ghy38RcOnWImhNdVP5zl1zh85FHz6ytePo+puHZwfTkuAQHj38gy6VF/14GU17qDcPTfjhbETGqEmh8QX6xfmWlO0ZrTmsAo4ZHav8yzbbl3oYdCLICOjMhOO1oY+B/DiURk3ZLPjaXyoo2Iql2QU=`, nil)
s, err = ParseBackend(`s3://bucket4/prefix/path?access-key=******&secret-access-key=******+&session-token=******`, nil)
require.NoError(t, err)
s3 = s.GetS3()
require.NotNil(t, s3)
require.Equal(t, "bucket4", s3.Bucket)
require.Equal(t, "prefix/path", s3.Prefix)
require.Equal(t, "NXN7IPIOSAAKDEEOLMAF", s3.AccessKey)
require.Equal(t, "nREY/7Dt+PaIbYKrKlEEMMF/ExCiJEX=XMLPUANw", s3.SecretAccessKey)
require.Equal(t, "FQoDYXdzEPP//////////wEaDPv5GPAhRW8pw6/nsiKsAZu7sZDCXPtEBEurxmvyV1r+nWy1I4VPbdIJV+iDnotwS3PKIyj+yDnOeigMf2yp9y2Dg9D7r51vWUyUQQfceZi9/8Ghy38RcOnWImhNdVP5zl1zh85FHz6ytePo+puHZwfTkuAQHj38gy6VF/14GU17qDcPTfjhbETGqEmh8QX6xfmWlO0ZrTmsAo4ZHav8yzbbl3oYdCLICOjMhOO1oY+B/DiURk3ZLPjaXyoo2Iql2QU=", s3.SessionToken)
require.Equal(t, "******", s3.AccessKey)
require.Equal(t, "******+", s3.SecretAccessKey)
require.Equal(t, "******", s3.SessionToken)
require.True(t, s3.ForcePathStyle)

// parse role ARN and external ID
Expand Down
24 changes: 9 additions & 15 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (bT backfillerType) String() string {
}
}

// BackfillJob is for a tidb_ddl_backfill table's record.
// BackfillJob is for a tidb_background_subtask table's record.
type BackfillJob struct {
ID int64
JobID int64
Expand All @@ -104,15 +104,9 @@ type BackfillJob struct {
State model.JobState
InstanceID string
InstanceLease types.Time
// range info
CurrKey []byte
StartKey []byte
EndKey []byte

StartTS uint64
FinishTS uint64
RowCount int64
Meta *model.BackfillMeta
StartTS uint64
StateUpdateTS uint64
Meta *model.BackfillMeta
}

// AbbrStr returns the BackfillJob's info without the Meta info.
Expand Down Expand Up @@ -325,7 +319,7 @@ func (w *backfillWorker) updateLease(execID string, bfJob *BackfillJob, nextKey
if err != nil {
return err
}
bfJob.CurrKey = nextKey
bfJob.Meta.CurrKey = nextKey
bfJob.InstanceID = execID
bfJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease)
return w.backfiller.UpdateTask(bfJob)
Expand Down Expand Up @@ -479,7 +473,7 @@ func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResul
// Change the batch size dynamically.
w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize())
result = w.handleBackfillTask(w.GetCtx().ddlCtx, task, w.backfiller)
task.bfJob.RowCount = int64(result.addedCount)
task.bfJob.Meta.RowCount = int64(result.addedCount)
if result.err != nil {
logutil.BgLogger().Warn("[ddl] backfill worker runTask failed",
zap.Stringer("worker", w), zap.String("backfillJob", task.bfJob.AbbrStr()), zap.Error(result.err))
Expand Down Expand Up @@ -1152,6 +1146,8 @@ func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJob
Type: reorgInfo.Job.Type,
Query: reorgInfo.Job.Query,
},
StartKey: task.startKey,
EndKey: task.endKey,
}
bj := &BackfillJob{
ID: sJobCtx.currBackfillJobID.Add(1),
Expand All @@ -1162,11 +1158,9 @@ func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJob
Tp: sJobCtx.bfWorkerType,
State: model.JobStateNone,
InstanceID: instanceID,
CurrKey: task.startKey,
StartKey: task.startKey,
EndKey: task.endKey,
Meta: bm,
}
bj.Meta.CurrKey = task.startKey
bJobs = append(bJobs, bj)
}
if err := AddBackfillJobs(sess, bJobs); err != nil {
Expand Down
82 changes: 37 additions & 45 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ const (
ReorgTable = "tidb_ddl_reorg"
// HistoryTable stores the history DDL jobs.
HistoryTable = "tidb_ddl_history"
// BackfillTable stores the information of backfill jobs.
BackfillTable = "tidb_ddl_backfill"
// BackfillHistoryTable stores the information of history backfill jobs.
BackfillHistoryTable = "tidb_ddl_backfill_history"
// BackgroundSubtaskTable stores the information of backfill jobs.
BackgroundSubtaskTable = "tidb_background_subtask"
// BackgroundSubtaskHistoryTable stores the information of history backfill jobs.
BackgroundSubtaskHistoryTable = "tidb_background_subtask_history"

// JobTableID is the table ID of `tidb_ddl_job`.
JobTableID = meta.MaxInt48 - 1
Expand All @@ -38,53 +38,45 @@ const (
HistoryTableID = meta.MaxInt48 - 3
// MDLTableID is the table ID of `tidb_mdl_info`.
MDLTableID = meta.MaxInt48 - 4
// BackfillTableID is the table ID of `tidb_ddl_backfill`.
BackfillTableID = meta.MaxInt48 - 5
// BackfillHistoryTableID is the table ID of `tidb_ddl_backfill_history`.
BackfillHistoryTableID = meta.MaxInt48 - 6
// BackgroundSubtaskTableID is the table ID of `tidb_background_subtask`.
BackgroundSubtaskTableID = meta.MaxInt48 - 5
// BackgroundSubtaskHistoryTableID is the table ID of `tidb_background_subtask_history`.
BackgroundSubtaskHistoryTableID = meta.MaxInt48 - 6

// JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`.
JobTableSQL = "create table " + JobTable + "(job_id bigint not null, reorg int, schema_ids text(65535), table_ids text(65535), job_meta longblob, type int, processing int, primary key(job_id))"
// ReorgTableSQL is the CREATE TABLE SQL of `tidb_ddl_reorg`.
ReorgTableSQL = "create table " + ReorgTable + "(job_id bigint not null, ele_id bigint, ele_type blob, start_key blob, end_key blob, physical_id bigint, reorg_meta longblob, unique key(job_id, ele_id, ele_type(20)))"
// HistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_history`.
HistoryTableSQL = "create table " + HistoryTable + "(job_id bigint not null, job_meta longblob, db_name char(64), table_name char(64), schema_ids text(65535), table_ids text(65535), create_time datetime, primary key(job_id))"
// BackfillTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill`.
BackfillTableSQL = "create table " + BackfillTable + `(
id bigint not null,
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
ddl_physical_tid bigint,
// BackgroundSubtaskTableSQL is the CREATE TABLE SQL of `tidb_background_subtask`.
BackgroundSubtaskTableSQL = "create table " + BackgroundSubtaskTable + `(
id bigint not null auto_increment primary key,
namespace varchar(256),
task_key varchar(256),
ddl_physical_tid bigint(20),
type int,
exec_id blob default null,
exec_lease timestamp,
state int,
curr_key blob,
start_key blob,
end_key blob,
start_ts bigint,
finish_ts bigint,
row_count bigint,
backfill_meta longblob,
unique key(ddl_job_id, ele_id, ele_key(20), id))`
// BackfillHistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill_history`.
BackfillHistoryTableSQL = "create table " + BackfillHistoryTable + `(
id bigint not null,
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
ddl_physical_tid bigint,
type int,
exec_id blob default null,
exec_lease timestamp,
state int,
curr_key blob,
start_key blob,
end_key blob,
start_ts bigint,
finish_ts bigint,
row_count bigint,
backfill_meta longblob,
unique key(ddl_job_id, ele_id, ele_key(20), id))`
exec_id varchar(256),
exec_expired timestamp,
state varchar(64) not null,
checkpoint longblob not null,
start_time bigint,
state_update_time bigint,
meta longblob,
unique key(namespace, task_key))`
// BackgroundSubtaskHistoryTableSQL is the CREATE TABLE SQL of `tidb_background_subtask_history`.
BackgroundSubtaskHistoryTableSQL = "create table " + BackgroundSubtaskHistoryTable + `(
id bigint not null auto_increment primary key,
namespace varchar(256),
task_key varchar(256),
ddl_physical_tid bigint(20),
type int,
exec_id varchar(256),
exec_expired timestamp,
state varchar(64) not null,
checkpoint longblob not null,
start_time bigint,
state_update_time bigint,
meta longblob,
unique key(namespace, task_key))`
)
15 changes: 12 additions & 3 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, startKey []byte, currElement *meta.El
rc.setCurrentElement(currElement)
rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)
rc.mu.warningsCount = make(map[errors.ErrorID]int64)
rc.references.Add(1)
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
dc.reorgCtx.reorgCtxMap[jobID] = rc
Expand All @@ -544,14 +545,22 @@ func (dc *ddlCtx) setReorgCtxForBackfill(bfJob *BackfillJob) {
rc := dc.getReorgCtx(bfJob.JobID)
if rc == nil {
ele := &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey}
dc.newReorgCtx(bfJob.JobID, bfJob.StartKey, ele, bfJob.RowCount)
dc.newReorgCtx(bfJob.JobID, bfJob.Meta.StartKey, ele, bfJob.Meta.RowCount)
} else {
rc.references.Add(1)
}
}

func (dc *ddlCtx) removeReorgCtx(job *model.Job) {
func (dc *ddlCtx) removeReorgCtx(jobID int64) {
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
delete(dc.reorgCtx.reorgCtxMap, job.ID)
ctx, ok := dc.reorgCtx.reorgCtxMap[jobID]
if ok {
ctx.references.Sub(1)
if ctx.references.Load() == 0 {
delete(dc.reorgCtx.reorgCtxMap, jobID)
}
}
}

func (dc *ddlCtx) notifyReorgCancel(job *model.Job) {
Expand Down
30 changes: 30 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ import (

const testLease = 5 * time.Millisecond

// DDLForTest exports for testing.
type DDLForTest interface {
// SetInterceptor sets the interceptor.
SetInterceptor(h Interceptor)
NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx
SetReorgCtxForBackfill(bfJob *BackfillJob)
GetReorgCtx(jobID int64) *reorgCtx
RemoveReorgCtx(id int64)
}

// SetInterceptor implements DDL.SetInterceptor interface.
Expand All @@ -52,6 +57,31 @@ func (d *ddl) SetInterceptor(i Interceptor) {
d.mu.interceptor = i
}

// IsReorgCanceled exports for testing.
func (rc *reorgCtx) IsReorgCanceled() bool {
return rc.isReorgCanceled()
}

// NewReorgCtx exports for testing.
func (d *ddl) NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx {
return d.newReorgCtx(jobID, startKey, currElement, rowCount)
}

// SetReorgCtxForBackfill exports for testing.
func (d *ddl) SetReorgCtxForBackfill(bfJob *BackfillJob) {
d.setReorgCtxForBackfill(bfJob)
}

// GetReorgCtx exports for testing.
func (d *ddl) GetReorgCtx(jobID int64) *reorgCtx {
return d.getReorgCtx(jobID)
}

// RemoveReorgCtx exports for testing.
func (d *ddl) RemoveReorgCtx(id int64) {
d.removeReorgCtx(id)
}

// JobNeedGCForTest is only used for test.
var JobNeedGCForTest = jobNeedGC

Expand Down
28 changes: 28 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,31 @@ func TestJobNeedGC(t *testing.T) {
}}}
require.True(t, ddl.JobNeedGCForTest(job))
}

func TestUsingReorgCtx(t *testing.T) {
_, domain := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
d := domain.DDL()

wg := util.WaitGroupWrapper{}
wg.Run(func() {
jobID := int64(1)
m := &model.BackfillMeta{StartKey: []byte("skey"), RowCount: 1}
bfJob := &ddl.BackfillJob{JobID: jobID, EleID: 1, EleKey: nil, Meta: m}
for i := 0; i < 100; i++ {
d.(ddl.DDLForTest).SetReorgCtxForBackfill(bfJob)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
})
wg.Run(func() {
jobID := int64(1)
startKey := []byte("skey")
ele := &meta.Element{ID: 1, TypeKey: nil}
for i := 0; i < 100; i++ {
d.(ddl.DDLForTest).NewReorgCtx(jobID, startKey, ele, 0)
d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled()
d.(ddl.DDLForTest).RemoveReorgCtx(jobID)
}
})
wg.Wait()
}
4 changes: 2 additions & 2 deletions ddl/dist_backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ func (dc *ddlCtx) backfillJob2Task(t table.Table, bfJob *BackfillJob) (*reorgBac
physicalTable: pt,
// TODO: Remove these fields after remove the old logic.
sqlQuery: bfJob.Meta.Query,
startKey: bfJob.StartKey,
endKey: bfJob.EndKey,
startKey: bfJob.Meta.StartKey,
endKey: bfJob.Meta.EndKey,
endInclude: bfJob.Meta.EndInclude,
priority: bfJob.Meta.Priority}, nil
}
Expand Down
Loading

0 comments on commit 6148f3e

Please sign in to comment.