diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 189b1875f3f43..82040c883bfed 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -1355,17 +1355,22 @@ func (e *executor) CreatePlacementPolicyWithInfo(ctx sessionctx.Context, policy policy.ID = policyID job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaName: policy.Name.L, Type: model.ActionCreatePlacementPolicy, BinlogInfo: &model.HistoryInfo{}, - Args: []any{policy, onExist == OnExistReplace}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ Policy: policy.Name.L, }}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } - err = e.DoDDLJob(ctx, job) + + args := &model.PlacementPolicyArgs{ + Policy: policy, + ReplaceOnExist: onExist == OnExistReplace, + } + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } @@ -5996,18 +6001,23 @@ func (e *executor) DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPla } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: policy.ID, SchemaName: policy.Name.L, Type: model.ActionDropPlacementPolicy, BinlogInfo: &model.HistoryInfo{}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - Args: []any{policyName}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ Policy: policyName.L, }}, SQLMode: ctx.GetSessionVars().SQLMode, } - err = e.DoDDLJob(ctx, job) + + args := &model.PlacementPolicyArgs{ + PolicyName: policyName, + PolicyID: policy.ID, + } + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } @@ -6034,18 +6044,22 @@ func (e *executor) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterP } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: policy.ID, SchemaName: policy.Name.L, Type: model.ActionAlterPlacementPolicy, BinlogInfo: &model.HistoryInfo{}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - Args: []any{newPolicyInfo}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{ Policy: newPolicyInfo.Name.L, }}, SQLMode: ctx.GetSessionVars().SQLMode, } - err = e.DoDDLJob(ctx, job) + args := &model.PlacementPolicyArgs{ + Policy: newPolicyInfo, + PolicyID: policy.ID, + } + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } diff --git a/pkg/ddl/placement_policy.go b/pkg/ddl/placement_policy.go index 8b180e3ba8e7e..03207e0427b3b 100644 --- a/pkg/ddl/placement_policy.go +++ b/pkg/ddl/placement_policy.go @@ -34,12 +34,13 @@ import ( ) func onCreatePlacementPolicy(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { - policyInfo := &model.PolicyInfo{} - var orReplace bool - if err := job.DecodeArgs(policyInfo, &orReplace); err != nil { + args, err := model.GetPlacementPolicyArgs(job) + if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } + policyInfo, orReplace := args.Policy, args.ReplaceOnExist + policyInfo.State = model.StateNone if err := checkPolicyValidation(policyInfo.PlacementSettings); err != nil { @@ -183,7 +184,11 @@ func checkAllTablePlacementPoliciesExistAndCancelNonExistJob(t *meta.Meta, job * } func onDropPlacementPolicy(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { - policyInfo, err := checkPlacementPolicyExistAndCancelNonExistJob(t, job, job.SchemaID) + args, err := model.GetPlacementPolicyArgs(job) + if err != nil { + return ver, errors.Trace(err) + } + policyInfo, err := checkPlacementPolicyExistAndCancelNonExistJob(t, job, args.PolicyID) if err != nil { return ver, errors.Trace(err) } @@ -241,19 +246,19 @@ func onDropPlacementPolicy(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ve } func onAlterPlacementPolicy(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { - alterPolicy := &model.PolicyInfo{} - if err := job.DecodeArgs(alterPolicy); err != nil { + args, err := model.GetPlacementPolicyArgs(job) + if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - oldPolicy, err := checkPlacementPolicyExistAndCancelNonExistJob(t, job, job.SchemaID) + oldPolicy, err := checkPlacementPolicyExistAndCancelNonExistJob(t, job, args.PolicyID) if err != nil { return ver, errors.Trace(err) } newPolicyInfo := *oldPolicy - newPolicyInfo.PlacementSettings = alterPolicy.PlacementSettings + newPolicyInfo.PlacementSettings = args.Policy.PlacementSettings err = checkPolicyValidation(newPolicyInfo.PlacementSettings) if err != nil { diff --git a/pkg/ddl/placement_policy_ddl_test.go b/pkg/ddl/placement_policy_ddl_test.go index f1968c5b2f8f5..73bb683db340b 100644 --- a/pkg/ddl/placement_policy_ddl_test.go +++ b/pkg/ddl/placement_policy_ddl_test.go @@ -44,13 +44,17 @@ func testPlacementPolicyInfo(t *testing.T, store kv.Storage, name string, settin func testCreatePlacementPolicy(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForTest, policyInfo *model.PolicyInfo) *model.Job { job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaName: policyInfo.Name.L, Type: model.ActionCreatePlacementPolicy, BinlogInfo: &model.HistoryInfo{}, - Args: []any{policyInfo}, } + args := &model.PlacementPolicyArgs{ + Policy: policyInfo, + } + ctx.SetValue(sessionctx.QueryString, "skip") - err := d.DoDDLJob(ctx, job) + err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, false)) require.NoError(t, err) v := getSchemaVer(t, ctx) diff --git a/pkg/meta/model/BUILD.bazel b/pkg/meta/model/BUILD.bazel index 08d9171be4f05..cf69f180a136b 100644 --- a/pkg/meta/model/BUILD.bazel +++ b/pkg/meta/model/BUILD.bazel @@ -45,7 +45,7 @@ go_test( ], embed = [":model"], flaky = True, - shard_count = 46, + shard_count = 47, deps = [ "//pkg/parser/ast", "//pkg/parser/charset", diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index 91dee5bd15b73..f62b3fa6f0c7c 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -1044,3 +1044,53 @@ func GetAddCheckConstraintArgs(job *Job) (*AddCheckConstraintArgs, error) { } return getOrDecodeArgsV2[*AddCheckConstraintArgs](job) } + +// PlacementPolicyArgs is the argument for create/alter/drop placement policy +type PlacementPolicyArgs struct { + Policy *PolicyInfo `json:"policy,omitempty"` + ReplaceOnExist bool `json:"replace_on_exist,omitempty"` + PolicyName pmodel.CIStr `json:"policy_name,omitempty"` + + // it's set for alter/drop policy in v2 + PolicyID int64 `json:"policy_id"` +} + +func (a *PlacementPolicyArgs) fillJob(job *Job) { + if job.Version == JobVersion1 { + if job.Type == ActionCreatePlacementPolicy { + job.Args = []any{a.Policy, a.ReplaceOnExist} + } else if job.Type == ActionAlterPlacementPolicy { + job.Args = []any{a.Policy} + } else { + intest.Assert(job.Type == ActionDropPlacementPolicy, "Invalid job type for PlacementPolicyArgs") + job.Args = []any{a.PolicyName} + } + return + } + job.Args = []any{a} +} + +// GetPlacementPolicyArgs gets the placement policy args. +func GetPlacementPolicyArgs(job *Job) (*PlacementPolicyArgs, error) { + if job.Version == JobVersion1 { + args := &PlacementPolicyArgs{PolicyID: job.SchemaID} + var err error + + if job.Type == ActionCreatePlacementPolicy { + err = job.DecodeArgs(&args.Policy, &args.ReplaceOnExist) + } else if job.Type == ActionAlterPlacementPolicy { + err = job.DecodeArgs(&args.Policy) + } else { + intest.Assert(job.Type == ActionDropPlacementPolicy, "Invalid job type for PlacementPolicyArgs") + err = job.DecodeArgs(&args.PolicyName) + } + + if err != nil { + return nil, errors.Trace(err) + } + + return args, err + } + + return getOrDecodeArgsV2[*PlacementPolicyArgs](job) +} diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index d17c0abca8c33..91dd775a6feb9 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -613,3 +613,31 @@ func TestCheckConstraintArgs(t *testing.T) { require.True(t, args.Enforced) } } + +func TestPlacementPolicyArgs(t *testing.T) { + inArgs := &PlacementPolicyArgs{ + Policy: &PolicyInfo{ID: 1, Name: model.NewCIStr("policy"), State: StateDeleteOnly}, + PolicyName: model.NewCIStr("policy_name"), + PolicyID: 123, + ReplaceOnExist: false, + } + for _, tp := range []ActionType{ActionCreatePlacementPolicy, ActionAlterPlacementPolicy, ActionDropPlacementPolicy} { + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, tp))) + j2.SchemaID = inArgs.PolicyID + args, err := GetPlacementPolicyArgs(j2) + require.NoError(t, err) + if tp == ActionCreatePlacementPolicy { + require.EqualValues(t, inArgs.Policy, args.Policy) + require.EqualValues(t, inArgs.ReplaceOnExist, args.ReplaceOnExist) + } else if tp == ActionAlterPlacementPolicy { + require.EqualValues(t, inArgs.Policy, args.Policy) + require.EqualValues(t, inArgs.PolicyID, args.PolicyID) + } else { + require.EqualValues(t, inArgs.PolicyName, args.PolicyName) + require.EqualValues(t, inArgs.PolicyID, args.PolicyID) + } + } + } +}