Skip to content

Commit

Permalink
ddl: args v2 for create/alter/drop placement policy (pingcap#56186)
Browse files Browse the repository at this point in the history
  • Loading branch information
joechenrh authored Sep 25, 2024
1 parent 75d9830 commit bf455f5
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 17 deletions.
26 changes: 20 additions & 6 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,17 +1355,22 @@ func (e *executor) CreatePlacementPolicyWithInfo(ctx sessionctx.Context, policy
policy.ID = policyID

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaName: policy.Name.L,
Type: model.ActionCreatePlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
Args: []any{policy, onExist == OnExistReplace},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Policy: policy.Name.L,
}},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)

args := &model.PlacementPolicyArgs{
Policy: policy,
ReplaceOnExist: onExist == OnExistReplace,
}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -5996,18 +6001,23 @@ func (e *executor) DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPla
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: policy.ID,
SchemaName: policy.Name.L,
Type: model.ActionDropPlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{policyName},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Policy: policyName.L,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)

args := &model.PlacementPolicyArgs{
PolicyName: policyName,
PolicyID: policy.ID,
}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand All @@ -6034,18 +6044,22 @@ func (e *executor) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterP
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: policy.ID,
SchemaName: policy.Name.L,
Type: model.ActionAlterPlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{newPolicyInfo},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Policy: newPolicyInfo.Name.L,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)
args := &model.PlacementPolicyArgs{
Policy: newPolicyInfo,
PolicyID: policy.ID,
}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down
21 changes: 13 additions & 8 deletions pkg/ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ import (
)

func onCreatePlacementPolicy(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
policyInfo := &model.PolicyInfo{}
var orReplace bool
if err := job.DecodeArgs(policyInfo, &orReplace); err != nil {
args, err := model.GetPlacementPolicyArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
policyInfo, orReplace := args.Policy, args.ReplaceOnExist

policyInfo.State = model.StateNone

if err := checkPolicyValidation(policyInfo.PlacementSettings); err != nil {
Expand Down Expand Up @@ -183,7 +184,11 @@ func checkAllTablePlacementPoliciesExistAndCancelNonExistJob(t *meta.Meta, job *
}

func onDropPlacementPolicy(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
policyInfo, err := checkPlacementPolicyExistAndCancelNonExistJob(t, job, job.SchemaID)
args, err := model.GetPlacementPolicyArgs(job)
if err != nil {
return ver, errors.Trace(err)
}
policyInfo, err := checkPlacementPolicyExistAndCancelNonExistJob(t, job, args.PolicyID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -241,19 +246,19 @@ func onDropPlacementPolicy(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ve
}

func onAlterPlacementPolicy(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
alterPolicy := &model.PolicyInfo{}
if err := job.DecodeArgs(alterPolicy); err != nil {
args, err := model.GetPlacementPolicyArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

oldPolicy, err := checkPlacementPolicyExistAndCancelNonExistJob(t, job, job.SchemaID)
oldPolicy, err := checkPlacementPolicyExistAndCancelNonExistJob(t, job, args.PolicyID)
if err != nil {
return ver, errors.Trace(err)
}

newPolicyInfo := *oldPolicy
newPolicyInfo.PlacementSettings = alterPolicy.PlacementSettings
newPolicyInfo.PlacementSettings = args.Policy.PlacementSettings

err = checkPolicyValidation(newPolicyInfo.PlacementSettings)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/ddl/placement_policy_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,17 @@ func testPlacementPolicyInfo(t *testing.T, store kv.Storage, name string, settin

func testCreatePlacementPolicy(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForTest, policyInfo *model.PolicyInfo) *model.Job {
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaName: policyInfo.Name.L,
Type: model.ActionCreatePlacementPolicy,
BinlogInfo: &model.HistoryInfo{},
Args: []any{policyInfo},
}
args := &model.PlacementPolicyArgs{
Policy: policyInfo,
}

ctx.SetValue(sessionctx.QueryString, "skip")
err := d.DoDDLJob(ctx, job)
err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, false))
require.NoError(t, err)

v := getSchemaVer(t, ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ go_test(
],
embed = [":model"],
flaky = True,
shard_count = 46,
shard_count = 47,
deps = [
"//pkg/parser/ast",
"//pkg/parser/charset",
Expand Down
50 changes: 50 additions & 0 deletions pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,3 +1044,53 @@ func GetAddCheckConstraintArgs(job *Job) (*AddCheckConstraintArgs, error) {
}
return getOrDecodeArgsV2[*AddCheckConstraintArgs](job)
}

// PlacementPolicyArgs is the argument for create/alter/drop placement policy
type PlacementPolicyArgs struct {
Policy *PolicyInfo `json:"policy,omitempty"`
ReplaceOnExist bool `json:"replace_on_exist,omitempty"`
PolicyName pmodel.CIStr `json:"policy_name,omitempty"`

// it's set for alter/drop policy in v2
PolicyID int64 `json:"policy_id"`
}

func (a *PlacementPolicyArgs) fillJob(job *Job) {
if job.Version == JobVersion1 {
if job.Type == ActionCreatePlacementPolicy {
job.Args = []any{a.Policy, a.ReplaceOnExist}
} else if job.Type == ActionAlterPlacementPolicy {
job.Args = []any{a.Policy}
} else {
intest.Assert(job.Type == ActionDropPlacementPolicy, "Invalid job type for PlacementPolicyArgs")
job.Args = []any{a.PolicyName}
}
return
}
job.Args = []any{a}
}

// GetPlacementPolicyArgs gets the placement policy args.
func GetPlacementPolicyArgs(job *Job) (*PlacementPolicyArgs, error) {
if job.Version == JobVersion1 {
args := &PlacementPolicyArgs{PolicyID: job.SchemaID}
var err error

if job.Type == ActionCreatePlacementPolicy {
err = job.DecodeArgs(&args.Policy, &args.ReplaceOnExist)
} else if job.Type == ActionAlterPlacementPolicy {
err = job.DecodeArgs(&args.Policy)
} else {
intest.Assert(job.Type == ActionDropPlacementPolicy, "Invalid job type for PlacementPolicyArgs")
err = job.DecodeArgs(&args.PolicyName)
}

if err != nil {
return nil, errors.Trace(err)
}

return args, err
}

return getOrDecodeArgsV2[*PlacementPolicyArgs](job)
}
28 changes: 28 additions & 0 deletions pkg/meta/model/job_args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,3 +613,31 @@ func TestCheckConstraintArgs(t *testing.T) {
require.True(t, args.Enforced)
}
}

func TestPlacementPolicyArgs(t *testing.T) {
inArgs := &PlacementPolicyArgs{
Policy: &PolicyInfo{ID: 1, Name: model.NewCIStr("policy"), State: StateDeleteOnly},
PolicyName: model.NewCIStr("policy_name"),
PolicyID: 123,
ReplaceOnExist: false,
}
for _, tp := range []ActionType{ActionCreatePlacementPolicy, ActionAlterPlacementPolicy, ActionDropPlacementPolicy} {
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, tp)))
j2.SchemaID = inArgs.PolicyID
args, err := GetPlacementPolicyArgs(j2)
require.NoError(t, err)
if tp == ActionCreatePlacementPolicy {
require.EqualValues(t, inArgs.Policy, args.Policy)
require.EqualValues(t, inArgs.ReplaceOnExist, args.ReplaceOnExist)
} else if tp == ActionAlterPlacementPolicy {
require.EqualValues(t, inArgs.Policy, args.Policy)
require.EqualValues(t, inArgs.PolicyID, args.PolicyID)
} else {
require.EqualValues(t, inArgs.PolicyName, args.PolicyName)
require.EqualValues(t, inArgs.PolicyID, args.PolicyID)
}
}
}
}

0 comments on commit bf455f5

Please sign in to comment.