From 876755f4d8e7e222ab8026b4af7cc8967c18529f Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Thu, 25 Apr 2024 21:43:41 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #10965 Signed-off-by: ti-chi-bot --- cdc/owner/ddl_manager.go | 27 +++++ cdc/puller/ddl_puller.go | 221 ++++++++++++++++++++++++++++++++++ cdc/puller/ddl_puller_test.go | 176 ++++++++++++++++++++++++--- 3 files changed, 407 insertions(+), 17 deletions(-) diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index 43c66686c38..d43e11f97f2 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -209,6 +209,7 @@ func (m *ddlManager) tick( break } +<<<<<<< HEAD if job != nil && job.BinlogInfo != nil { log.Info("handle a ddl job", zap.String("namespace", m.changfeedID.Namespace), @@ -223,6 +224,32 @@ func (m *ddlManager) tick( return nil, nil, err } +======= + if job.BinlogInfo == nil { + continue + } + + log.Info("handle a ddl job", + zap.String("namespace", m.changfeedID.Namespace), + zap.String("changefeed", m.changfeedID.ID), + zap.Int64("tableID", job.TableID), + zap.Int64("jobID", job.ID), + zap.String("query", job.Query), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), + ) + events, err := m.schema.BuildDDLEvents(ctx, job) + if err != nil { + return nil, nil, err + } + + for _, event := range events { + tableName := event.TableInfo.TableName + m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event) + } + + // Send DDL events to redo log. + if m.redoDDLManager.Enabled() { +>>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) for _, event := range events { // TODO: find a better place to do this check // check if the ddl event is belong to an ineligible table. diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index d29b131acc8..d02d0f88a31 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -16,6 +16,7 @@ package puller import ( "context" "encoding/json" + "fmt" "sync" "sync/atomic" "time" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/entry" + "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sorter/memory" @@ -156,6 +158,85 @@ func (p *ddlJobPullerImpl) Output() <-chan *model.DDLJobEntry { return p.outputCh } +<<<<<<< HEAD +======= +// Input receives the raw kv entry and put it into the input channel. +func (p *ddlJobPullerImpl) Input( + ctx context.Context, + rawDDL *model.RawKVEntry, + _ []tablepb.Span, +) error { + p.sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawDDL)) + return nil +} + +// handleRawKVEntry converts the raw kv entry to DDL job and sends it to the output channel. +func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model.RawKVEntry) error { + if ddlRawKV == nil { + return nil + } + + if ddlRawKV.OpType == model.OpTypeResolved { + // Only nil in unit test case. + if p.schemaStorage != nil { + p.schemaStorage.AdvanceResolvedTs(ddlRawKV.CRTs) + } + if ddlRawKV.CRTs > p.getResolvedTs() { + p.setResolvedTs(ddlRawKV.CRTs) + } + } + + job, err := p.unmarshalDDL(ddlRawKV) + if err != nil { + return errors.Trace(err) + } + + if job != nil { + skip, err := p.handleJob(job) + if err != nil { + return err + } + if skip { + return nil + } + log.Info("a new ddl job is received", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), + zap.String("query", job.Query), + zap.Any("job", job)) + } + + jobEntry := &model.DDLJobEntry{ + Job: job, + OpType: ddlRawKV.OpType, + CRTs: ddlRawKV.CRTs, + } + select { + case <-ctx.Done(): + return ctx.Err() + case p.outputCh <- jobEntry: + } + return nil +} + +func (p *ddlJobPullerImpl) unmarshalDDL(rawKV *model.RawKVEntry) (*timodel.Job, error) { + if rawKV.OpType != model.OpTypePut { + return nil, nil + } + if p.ddlJobsTable == nil && !entry.IsLegacyFormatJob(rawKV) { + err := p.initJobTableMeta() + if err != nil { + return nil, errors.Trace(err) + } + } + return entry.ParseDDLJob(p.ddlJobsTable, rawKV, p.jobMetaColumnID) +} + +>>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) func (p *ddlJobPullerImpl) getResolvedTs() uint64 { return atomic.LoadUint64(&p.resolvedTs) } @@ -212,8 +293,148 @@ func (p *ddlJobPullerImpl) unmarshalDDL(rawKV *model.RawKVEntry) (*timodel.Job, if err != nil { return nil, errors.Trace(err) } +<<<<<<< HEAD } return entry.ParseDDLJob(p.ddlJobsTable, rawKV, p.jobMetaColumnID) +======= + }() + + snap := p.schemaStorage.GetLastSnapshot() + if err = snap.FillSchemaName(job); err != nil { + log.Info("failed to fill schema name for ddl job", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), + zap.Error(err)) + if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) { + return true, nil + } + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) + } + + switch job.Type { + case timodel.ActionRenameTables: + skip, err = p.handleRenameTables(job) + if err != nil { + log.Warn("handle rename tables ddl job failed", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), + zap.Error(err)) + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) + } + case timodel.ActionRenameTable: + oldTable, ok := snap.PhysicalTableByID(job.TableID) + if !ok { + // 1. If we can not find the old table, and the new table name is in filter rule, return error. + discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) + if !discard { + return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) + } + log.Warn("skip rename table ddl since cannot found the old table info", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.Int64("tableID", job.TableID), + zap.Int64("newSchemaID", job.SchemaID), + zap.String("newSchemaName", job.SchemaName), + zap.String("oldTableName", job.BinlogInfo.TableInfo.Name.O), + zap.String("newTableName", job.TableName)) + return true, nil + } + // since we can find the old table, it must be able to find the old schema. + // 2. If we can find the preTableInfo, we filter it by the old table name. + skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table) + skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) + if err != nil { + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) + } + // 3. If its old table name is not in filter rule, and its new table name in filter rule, return error. + if skipByOldTableName { + if !skipByNewTableName { + return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) + } + return true, nil + } + log.Info("ddl puller receive rename table ddl job", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS)) + default: + // nil means it is a schema ddl job, it's no need to fill the table name. + if job.BinlogInfo.TableInfo != nil { + job.TableName = job.BinlogInfo.TableInfo.Name.O + } + skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) + } + + if skip { + return true, nil + } + + err = p.schemaStorage.HandleDDLJob(job) + if err != nil { + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) + } + p.setResolvedTs(job.BinlogInfo.FinishedTS) + p.schemaVersion = job.BinlogInfo.SchemaVersion + + return p.checkIneligibleTableDDL(snap, job) +} + +// checkIneligibleTableDDL checks if the table is ineligible before and after the DDL. +// 1. If it is not a table DDL, we shouldn't check it. +// 2. If the table after the DDL is ineligible: +// a. If the table is not exist before the DDL, we should ignore the DDL. +// b. If the table is ineligible before the DDL, we should ignore the DDL. +// c. If the table is eligible before the DDL, we should return an error. +func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot, job *timodel.Job) (skip bool, err error) { + if filter.IsSchemaDDL(job.Type) { + return false, nil + } + + ineligible := p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(job.TableID) + if !ineligible { + return false, nil + } + + // If the table is not in the snapshot before the DDL, + // we should ignore the DDL. + _, exist := snapBefore.PhysicalTableByID(job.TableID) + if !exist { + return true, nil + } + + // If the table after the DDL is ineligible, we should check if it is not ineligible before the DDL. + // If so, we should return an error to inform the user that it is a + // dangerous operation and should be handled manually. + isBeforeineligible := snapBefore.IsIneligibleTableID(job.TableID) + if isBeforeineligible { + log.Warn("ignore the DDL event of ineligible table", + zap.String("changefeed", p.changefeedID.ID), zap.Any("ddl", job)) + return true, nil + } + return false, cerror.New(fmt.Sprintf("An eligible table become ineligible after DDL: [%s] "+ + "it is a dangerous operation and may cause data loss. If you want to replicate this ddl safely, "+ + "pelase pause the changefeed and update the `force-replicate=true` "+ + "in the changefeed configuration, "+ + "then resume the changefeed.", job.Query)) +>>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) } // handleRenameTables gets all the tables that are renamed diff --git a/cdc/puller/ddl_puller_test.go b/cdc/puller/ddl_puller_test.go index aa51bf1ef6b..830d38195cf 100644 --- a/cdc/puller/ddl_puller_test.go +++ b/cdc/puller/ddl_puller_test.go @@ -199,12 +199,13 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t1(id int)") + job = helper.DDL2Job("create table test1.t1(id int primary key)") remainTables[0] = job.TableID mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) +<<<<<<< HEAD job = helper.DDL2Job("create table test1.t2(id int)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) @@ -218,6 +219,21 @@ func TestHandleRenameTable(t *testing.T) { job = helper.DDL2Job("create table test1.t5(id int)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) +======= + job = helper.DDL2Job("create table test1.t2(id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) + waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) + + job = helper.DDL2Job("create table test1.t3(id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) + waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) + + job = helper.DDL2Job("create table test1.t5(id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) +>>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) job = helper.DDL2Job("create database ignore1") @@ -225,9 +241,15 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) +<<<<<<< HEAD job = helper.DDL2Job("create table ignore1.a(id int)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) +======= + job = helper.DDL2Job("create table ignore1.a(id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) +>>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) job = helper.DDL2Job("rename table test1.t1 to test1.t11, test1.t3 to test1.t33, test1.t5 to test1.t55, ignore1.a to ignore1.b") @@ -240,7 +262,7 @@ func TestHandleRenameTable(t *testing.T) { } { - _ = helper.DDL2Job("create table test1.t6(id int)") + _ = helper.DDL2Job("create table test1.t6(id int primary key)") job := helper.DDL2Job("rename table test1.t2 to test1.t22, test1.t6 to test1.t66") skip, err := ddlJobPullerImpl.handleRenameTables(job) require.Error(t, err) @@ -257,6 +279,7 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) +<<<<<<< HEAD job = helper.DDL2Job("create table test2.t1(id int)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) @@ -270,6 +293,21 @@ func TestHandleRenameTable(t *testing.T) { job = helper.DDL2Job("create table test2.t3(id int)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) +======= + job = helper.DDL2Job("create table test2.t1(id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) + waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) + + job = helper.DDL2Job("create table test2.t2(id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) + waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) + + job = helper.DDL2Job("create table test2.t3(id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) +>>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) job = helper.DDL2Job("rename table test2.t1 to test2.t11, test2.t2 to test2.t22, test2.t3 to test2.t33") @@ -285,6 +323,7 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) +<<<<<<< HEAD job = helper.DDL2Job("create table Test3.t1(id int)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) @@ -294,6 +333,17 @@ func TestHandleRenameTable(t *testing.T) { job = helper.DDL2Job("create table Test3.t2(id int)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) +======= + job = helper.DDL2Job("create table Test3.t1(id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) + waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) + + // skip this table + job = helper.DDL2Job("create table Test3.t2(id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) +>>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) job = helper.DDL2Job("rename table Test3.t1 to Test3.t11, Test3.t2 to Test3.t22") @@ -307,6 +357,7 @@ func TestHandleRenameTable(t *testing.T) { // test rename table { +<<<<<<< HEAD job := helper.DDL2Job("create table test1.t99 (id int)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) @@ -337,6 +388,38 @@ func TestHandleRenameTable(t *testing.T) { job = helper.DDL2Job("create table test1.t202308082 (id int)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) +======= + job := helper.DDL2Job("create table test1.t99 (id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) + waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) + + // this ddl should be skipped + job = helper.DDL2Job("create table test1.t1000 (id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) + waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) + + // this ddl should be skipped + job = helper.DDL2Job("create table test1.t888 (id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) + waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) + + job = helper.DDL2Job("create table test1.t20230808 (id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) + waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) + + job = helper.DDL2Job("create table test1.t202308081 (id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) + waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) + + job = helper.DDL2Job("create table test1.t202308082 (id int primary key)") + inputDDL(t, ddlJobPullerImpl, job) + inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) +>>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) // since test1.99 in filter rule, we replicate it job = helper.DDL2Job("rename table test1.t99 to test1.t999") @@ -346,9 +429,8 @@ func TestHandleRenameTable(t *testing.T) { // since test1.t100 is in filter rule, replicate it job = helper.DDL2Job("rename table test1.t1000 to test1.t100") - skip, err = ddlJobPullerImpl.handleJob(job) + _, err = ddlJobPullerImpl.handleJob(job) require.Error(t, err) - require.True(t, skip) require.Contains(t, err.Error(), fmt.Sprintf("table's old name is not in filter rule, and its new name in filter rule "+ "table id '%d', ddl query: [%s], it's an unexpected behavior, "+ "if you want to replicate this table, please add its old name to filter rule.", job.TableID, job.Query)) @@ -370,9 +452,8 @@ func TestHandleRenameTable(t *testing.T) { // but now it will throw an error since schema ignore1 are not in schemaStorage // ref: https://github.com/pingcap/tiflow/issues/9488 job = helper.DDL2Job("rename table test1.t202308081 to ignore1.ignore1, test1.t202308082 to ignore1.dongmen") - skip, err = ddlJobPullerImpl.handleJob(job) + _, err = ddlJobPullerImpl.handleJob(job) require.NotNil(t, err) - require.True(t, skip) require.Contains(t, err.Error(), "ErrSnapshotSchemaNotFound") } } @@ -432,7 +513,7 @@ func TestHandleJob(t *testing.T) { // test create table { - job := helper.DDL2Job("create table test1.t1(id int) partition by range(id) (partition p0 values less than (10))") + job := helper.DDL2Job("create table test1.t1(id int primary key) partition by range(id) (partition p0 values less than (10))") skip, err := ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.False(t, skip) @@ -442,7 +523,7 @@ func TestHandleJob(t *testing.T) { require.NoError(t, err) require.False(t, skip) - job = helper.DDL2Job("create table test1.testStartTs(id int)") + job = helper.DDL2Job("create table test1.testStartTs(id int primary key)") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.False(t, skip) @@ -453,23 +534,23 @@ func TestHandleJob(t *testing.T) { require.NoError(t, err) require.False(t, skip) - job = helper.DDL2Job("create table test1.t2(id int)") + job = helper.DDL2Job("create table test1.t2(id int primary key)") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.False(t, skip) - job = helper.DDL2Job("create table test1.t3(id int)") + job = helper.DDL2Job("create table test1.t3(id int primary key)") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.True(t, skip) - job = helper.DDL2Job("create table test1.t4(id int) partition by range(id) (partition p0 values less than (10))") + job = helper.DDL2Job("create table test1.t4(id int primary key) partition by range(id) (partition p0 values less than (10))") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.True(t, skip) // make sure no schema not found error - job = helper.DDL2Job("create table test3.t1(id int) partition by range(id) (partition p0 values less than (10))") + job = helper.DDL2Job("create table test3.t1(id int primary key) partition by range(id) (partition p0 values less than (10))") skip, err = ddlJobPullerImpl.handleJob(job) require.NoError(t, err) require.True(t, skip) @@ -645,7 +726,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 5, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 2, FinishedTS: 18}, - Query: "create table test.t1(id int)", + Query: "create table test.t1(id int primary key)", }) mockPuller.appendDDL(&timodel.Job{ ID: 1, @@ -653,7 +734,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 5, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 1, FinishedTS: 16}, - Query: "create table t2(id int)", + Query: "create table t2(id int primary key)", }) resolvedTs, ddl = p.PopFrontDDL() require.Equal(t, resolvedTs, uint64(15)) @@ -678,7 +759,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 20, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 4, FinishedTS: 25}, - Query: "create table t3(id int)", + Query: "create table t3(id int primary key)", }) mockPuller.appendDDL(&timodel.Job{ ID: 3, @@ -686,7 +767,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 20, State: timodel.JobStateDone, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 4, FinishedTS: 25}, - Query: "create table t3(id int)", + Query: "create table t3(id int primary key)", }) mockPuller.appendResolvedTs(30) waitResolvedTsGrowing(t, p, 25) @@ -708,7 +789,7 @@ func TestDDLPuller(t *testing.T) { StartTS: 20, State: timodel.JobStateCancelled, BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 6, FinishedTS: 36}, - Query: "create table t4(id int)", + Query: "create table t4(id int primary key)", }) mockPuller.appendResolvedTs(40) waitResolvedTsGrowing(t, p, 40) @@ -803,3 +884,64 @@ func waitResolvedTsGrowing(t *testing.T, p DDLPuller, targetTs model.Ts) { }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(200)) require.Nil(t, err) } + +func TestCcheckIneligibleTableDDL(t *testing.T) { + ddlJobPuller, helper := newMockDDLJobPuller(t, true) + defer helper.Close() + + startTs := uint64(10) + ddlJobPullerImpl := ddlJobPuller.(*ddlJobPullerImpl) + ddlJobPullerImpl.setResolvedTs(startTs) + + cfg := config.GetDefaultReplicaConfig() + f, err := filter.NewFilter(cfg, "") + require.NoError(t, err) + ddlJobPullerImpl.filter = f + + ddl := helper.DDL2Job("CREATE DATABASE test1") + skip, err := ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + // case 1: create a table only has a primary key and drop it, expect an error. + // It is because the table is not eligible after the drop primary key DDL. + ddl = helper.DDL2Job(`CREATE TABLE test1.t1 ( + id INT PRIMARY KEY /*T![clustered_index] NONCLUSTERED */, + name VARCHAR(255), + email VARCHAR(255) UNIQUE + );`) + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + ddl = helper.DDL2Job("ALTER TABLE test1.t1 DROP PRIMARY KEY;") + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.Error(t, err) + require.False(t, skip) + require.Contains(t, err.Error(), "An eligible table become ineligible after DDL") + + // case 2: create a table has a primary key and another not null unique key, + // and drop the primary key, expect no error. + // It is because the table is still eligible after the drop primary key DDL. + ddl = helper.DDL2Job(`CREATE TABLE test1.t2 ( + id INT PRIMARY KEY /*T![clustered_index] NONCLUSTERED */, + name VARCHAR(255), + email VARCHAR(255) NOT NULL UNIQUE + );`) + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + ddl = helper.DDL2Job("ALTER TABLE test1.t2 DROP PRIMARY KEY;") + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.NoError(t, err) + require.False(t, skip) + + // case 3: continue to drop the unique key, expect an error. + // It is because the table is not eligible after the drop unique key DDL. + ddl = helper.DDL2Job("ALTER TABLE test1.t2 DROP INDEX email;") + skip, err = ddlJobPullerImpl.handleJob(ddl) + require.Error(t, err) + require.False(t, skip) + require.Contains(t, err.Error(), "An eligible table become ineligible after DDL") +} From eb2e269c711ebee9985a9c511c234bc927095fbe Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 10 Jun 2024 18:33:52 +0800 Subject: [PATCH 2/3] resolve conflict Signed-off-by: dongmen <414110582@qq.com> --- cdc/owner/ddl_manager.go | 44 ------ cdc/puller/ddl_puller.go | 285 +++++++--------------------------- cdc/puller/ddl_puller_test.go | 119 +++----------- 3 files changed, 76 insertions(+), 372 deletions(-) diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index d43e11f97f2..13fd237b3c3 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -209,7 +209,6 @@ func (m *ddlManager) tick( break } -<<<<<<< HEAD if job != nil && job.BinlogInfo != nil { log.Info("handle a ddl job", zap.String("namespace", m.changfeedID.Namespace), @@ -224,51 +223,8 @@ func (m *ddlManager) tick( return nil, nil, err } -======= - if job.BinlogInfo == nil { - continue - } - - log.Info("handle a ddl job", - zap.String("namespace", m.changfeedID.Namespace), - zap.String("changefeed", m.changfeedID.ID), - zap.Int64("tableID", job.TableID), - zap.Int64("jobID", job.ID), - zap.String("query", job.Query), - zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), - ) - events, err := m.schema.BuildDDLEvents(ctx, job) - if err != nil { - return nil, nil, err - } - - for _, event := range events { - tableName := event.TableInfo.TableName - m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event) - } - - // Send DDL events to redo log. - if m.redoDDLManager.Enabled() { ->>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) for _, event := range events { - // TODO: find a better place to do this check - // check if the ddl event is belong to an ineligible table. - // If so, we should ignore it. - if !filter.IsSchemaDDL(event.Type) { - ignore, err := m.schema. - IsIneligibleTable(ctx, event.TableInfo.TableName.TableID, event.CommitTs) - if err != nil { - return nil, nil, errors.Trace(err) - } - if ignore { - log.Warn("ignore the DDL event of ineligible table", - zap.String("changefeed", m.changfeedID.ID), zap.Any("ddl", event)) - continue - } - } - tableName := event.TableInfo.TableName - // Add all valid DDL events to the pendingDDLs. m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event) } diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index d02d0f88a31..c79265a97e2 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -124,8 +124,7 @@ func (p *ddlJobPullerImpl) Run(ctx context.Context) error { if job != nil { skip, err := p.handleJob(job) if err != nil { - return cerror.WrapError(cerror.ErrHandleDDLFailed, - err, job.String(), job.Query, job.StartTS, job.StartTS) + return err } log.Info("handle ddl job", zap.String("namespace", p.changefeedID.Namespace), @@ -158,85 +157,6 @@ func (p *ddlJobPullerImpl) Output() <-chan *model.DDLJobEntry { return p.outputCh } -<<<<<<< HEAD -======= -// Input receives the raw kv entry and put it into the input channel. -func (p *ddlJobPullerImpl) Input( - ctx context.Context, - rawDDL *model.RawKVEntry, - _ []tablepb.Span, -) error { - p.sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawDDL)) - return nil -} - -// handleRawKVEntry converts the raw kv entry to DDL job and sends it to the output channel. -func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model.RawKVEntry) error { - if ddlRawKV == nil { - return nil - } - - if ddlRawKV.OpType == model.OpTypeResolved { - // Only nil in unit test case. - if p.schemaStorage != nil { - p.schemaStorage.AdvanceResolvedTs(ddlRawKV.CRTs) - } - if ddlRawKV.CRTs > p.getResolvedTs() { - p.setResolvedTs(ddlRawKV.CRTs) - } - } - - job, err := p.unmarshalDDL(ddlRawKV) - if err != nil { - return errors.Trace(err) - } - - if job != nil { - skip, err := p.handleJob(job) - if err != nil { - return err - } - if skip { - return nil - } - log.Info("a new ddl job is received", - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.String("schema", job.SchemaName), - zap.String("table", job.TableName), - zap.Uint64("startTs", job.StartTS), - zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), - zap.String("query", job.Query), - zap.Any("job", job)) - } - - jobEntry := &model.DDLJobEntry{ - Job: job, - OpType: ddlRawKV.OpType, - CRTs: ddlRawKV.CRTs, - } - select { - case <-ctx.Done(): - return ctx.Err() - case p.outputCh <- jobEntry: - } - return nil -} - -func (p *ddlJobPullerImpl) unmarshalDDL(rawKV *model.RawKVEntry) (*timodel.Job, error) { - if rawKV.OpType != model.OpTypePut { - return nil, nil - } - if p.ddlJobsTable == nil && !entry.IsLegacyFormatJob(rawKV) { - err := p.initJobTableMeta() - if err != nil { - return nil, errors.Trace(err) - } - } - return entry.ParseDDLJob(p.ddlJobsTable, rawKV, p.jobMetaColumnID) -} - ->>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) func (p *ddlJobPullerImpl) getResolvedTs() uint64 { return atomic.LoadUint64(&p.resolvedTs) } @@ -293,148 +213,8 @@ func (p *ddlJobPullerImpl) unmarshalDDL(rawKV *model.RawKVEntry) (*timodel.Job, if err != nil { return nil, errors.Trace(err) } -<<<<<<< HEAD } return entry.ParseDDLJob(p.ddlJobsTable, rawKV, p.jobMetaColumnID) -======= - }() - - snap := p.schemaStorage.GetLastSnapshot() - if err = snap.FillSchemaName(job); err != nil { - log.Info("failed to fill schema name for ddl job", - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.String("schema", job.SchemaName), - zap.String("table", job.TableName), - zap.String("query", job.Query), - zap.Uint64("startTs", job.StartTS), - zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), - zap.Error(err)) - if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) { - return true, nil - } - return false, cerror.WrapError(cerror.ErrHandleDDLFailed, - errors.Trace(err), job.Query, job.StartTS, job.StartTS) - } - - switch job.Type { - case timodel.ActionRenameTables: - skip, err = p.handleRenameTables(job) - if err != nil { - log.Warn("handle rename tables ddl job failed", - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.String("schema", job.SchemaName), - zap.String("table", job.TableName), - zap.String("query", job.Query), - zap.Uint64("startTs", job.StartTS), - zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), - zap.Error(err)) - return false, cerror.WrapError(cerror.ErrHandleDDLFailed, - errors.Trace(err), job.Query, job.StartTS, job.StartTS) - } - case timodel.ActionRenameTable: - oldTable, ok := snap.PhysicalTableByID(job.TableID) - if !ok { - // 1. If we can not find the old table, and the new table name is in filter rule, return error. - discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) - if !discard { - return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) - } - log.Warn("skip rename table ddl since cannot found the old table info", - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", job.TableID), - zap.Int64("newSchemaID", job.SchemaID), - zap.String("newSchemaName", job.SchemaName), - zap.String("oldTableName", job.BinlogInfo.TableInfo.Name.O), - zap.String("newTableName", job.TableName)) - return true, nil - } - // since we can find the old table, it must be able to find the old schema. - // 2. If we can find the preTableInfo, we filter it by the old table name. - skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table) - skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) - if err != nil { - return false, cerror.WrapError(cerror.ErrHandleDDLFailed, - errors.Trace(err), job.Query, job.StartTS, job.StartTS) - } - // 3. If its old table name is not in filter rule, and its new table name in filter rule, return error. - if skipByOldTableName { - if !skipByNewTableName { - return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) - } - return true, nil - } - log.Info("ddl puller receive rename table ddl job", - zap.String("namespace", p.changefeedID.Namespace), - zap.String("changefeed", p.changefeedID.ID), - zap.String("schema", job.SchemaName), - zap.String("table", job.TableName), - zap.String("query", job.Query), - zap.Uint64("startTs", job.StartTS), - zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS)) - default: - // nil means it is a schema ddl job, it's no need to fill the table name. - if job.BinlogInfo.TableInfo != nil { - job.TableName = job.BinlogInfo.TableInfo.Name.O - } - skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) - } - - if skip { - return true, nil - } - - err = p.schemaStorage.HandleDDLJob(job) - if err != nil { - return false, cerror.WrapError(cerror.ErrHandleDDLFailed, - errors.Trace(err), job.Query, job.StartTS, job.StartTS) - } - p.setResolvedTs(job.BinlogInfo.FinishedTS) - p.schemaVersion = job.BinlogInfo.SchemaVersion - - return p.checkIneligibleTableDDL(snap, job) -} - -// checkIneligibleTableDDL checks if the table is ineligible before and after the DDL. -// 1. If it is not a table DDL, we shouldn't check it. -// 2. If the table after the DDL is ineligible: -// a. If the table is not exist before the DDL, we should ignore the DDL. -// b. If the table is ineligible before the DDL, we should ignore the DDL. -// c. If the table is eligible before the DDL, we should return an error. -func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot, job *timodel.Job) (skip bool, err error) { - if filter.IsSchemaDDL(job.Type) { - return false, nil - } - - ineligible := p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(job.TableID) - if !ineligible { - return false, nil - } - - // If the table is not in the snapshot before the DDL, - // we should ignore the DDL. - _, exist := snapBefore.PhysicalTableByID(job.TableID) - if !exist { - return true, nil - } - - // If the table after the DDL is ineligible, we should check if it is not ineligible before the DDL. - // If so, we should return an error to inform the user that it is a - // dangerous operation and should be handled manually. - isBeforeineligible := snapBefore.IsIneligibleTableID(job.TableID) - if isBeforeineligible { - log.Warn("ignore the DDL event of ineligible table", - zap.String("changefeed", p.changefeedID.ID), zap.Any("ddl", job)) - return true, nil - } - return false, cerror.New(fmt.Sprintf("An eligible table become ineligible after DDL: [%s] "+ - "it is a dangerous operation and may cause data loss. If you want to replicate this ddl safely, "+ - "pelase pause the changefeed and update the `force-replicate=true` "+ - "in the changefeed configuration, "+ - "then resume the changefeed.", job.Query)) ->>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) } // handleRenameTables gets all the tables that are renamed @@ -579,7 +359,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) { return true, nil } - return true, errors.Trace(err) + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) } if job.BinlogInfo.FinishedTS <= p.getResolvedTs() || @@ -602,7 +383,17 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { case timodel.ActionRenameTables: skip, err = p.handleRenameTables(job) if err != nil { - return true, errors.Trace(err) + log.Warn("handle rename tables ddl job failed", + zap.String("namespace", p.changefeedID.Namespace), + zap.String("changefeed", p.changefeedID.ID), + zap.String("schema", job.SchemaName), + zap.String("table", job.TableName), + zap.String("query", job.Query), + zap.Uint64("startTs", job.StartTS), + zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), + zap.Error(err)) + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) } case timodel.ActionRenameTable: log.Info("rename table ddl job", @@ -617,7 +408,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { // 1. If we can not find the old table, and the new table name is in filter rule, return error. discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) if !discard { - return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) + return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) } skip = true } else { @@ -629,7 +420,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) // 3. If its old table name is not in filter rule, and its new table name in filter rule, return error. if skipByOldTableName && !skipByNewTableName { - return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) + return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query) } if skipByOldTableName && skipByNewTableName { skip = true @@ -658,13 +449,53 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) { zap.String("table", job.BinlogInfo.TableInfo.Name.O), zap.String("job", job.String()), zap.Error(err)) - return true, errors.Trace(err) + return false, cerror.WrapError(cerror.ErrHandleDDLFailed, + errors.Trace(err), job.Query, job.StartTS, job.StartTS) } p.setResolvedTs(job.BinlogInfo.FinishedTS) p.schemaVersion = job.BinlogInfo.SchemaVersion - return false, nil + return p.checkIneligibleTableDDL(snap, job) +} + +// checkIneligibleTableDDL checks if the table is ineligible before and after the DDL. +// 1. If it is not a table DDL, we shouldn't check it. +// 2. If the table after the DDL is ineligible: +// a. If the table is not exist before the DDL, we should ignore the DDL. +// b. If the table is ineligible before the DDL, we should ignore the DDL. +// c. If the table is eligible before the DDL, we should return an error. +func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot, job *timodel.Job) (skip bool, err error) { + if filter.IsSchemaDDL(job.Type) { + return false, nil + } + + ineligible := p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(job.TableID) + if !ineligible { + return false, nil + } + + // If the table is not in the snapshot before the DDL, + // we should ignore the DDL. + _, exist := snapBefore.PhysicalTableByID(job.TableID) + if !exist { + return true, nil + } + + // If the table after the DDL is ineligible, we should check if it is not ineligible before the DDL. + // If so, we should return an error to inform the user that it is a + // dangerous operation and should be handled manually. + isBeforeineligible := snapBefore.IsIneligibleTableID(job.TableID) + if isBeforeineligible { + log.Warn("ignore the DDL event of ineligible table", + zap.String("changefeed", p.changefeedID.ID), zap.Any("ddl", job)) + return true, nil + } + return false, cerror.New(fmt.Sprintf("An eligible table become ineligible after DDL: [%s] "+ + "it is a dangerous operation and may cause data loss. If you want to replicate this ddl safely, "+ + "pelase pause the changefeed and update the `force-replicate=true` "+ + "in the changefeed configuration, "+ + "then resume the changefeed.", job.Query)) } func findDBByName(dbs []*timodel.DBInfo, name string) (*timodel.DBInfo, error) { diff --git a/cdc/puller/ddl_puller_test.go b/cdc/puller/ddl_puller_test.go index 830d38195cf..d5d6677dc16 100644 --- a/cdc/puller/ddl_puller_test.go +++ b/cdc/puller/ddl_puller_test.go @@ -205,35 +205,19 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) -<<<<<<< HEAD - job = helper.DDL2Job("create table test1.t2(id int)") + job = helper.DDL2Job("create table test1.t2(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t3(id int)") + job = helper.DDL2Job("create table test1.t3(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t5(id int)") + job = helper.DDL2Job("create table test1.t5(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) -======= - job = helper.DDL2Job("create table test1.t2(id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) - waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - - job = helper.DDL2Job("create table test1.t3(id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) - waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - - job = helper.DDL2Job("create table test1.t5(id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) ->>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) job = helper.DDL2Job("create database ignore1") @@ -241,15 +225,9 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) -<<<<<<< HEAD - job = helper.DDL2Job("create table ignore1.a(id int)") + job = helper.DDL2Job("create table ignore1.a(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) -======= - job = helper.DDL2Job("create table ignore1.a(id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) ->>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) job = helper.DDL2Job("rename table test1.t1 to test1.t11, test1.t3 to test1.t33, test1.t5 to test1.t55, ignore1.a to ignore1.b") @@ -279,35 +257,19 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) -<<<<<<< HEAD - job = helper.DDL2Job("create table test2.t1(id int)") + job = helper.DDL2Job("create table test2.t1(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test2.t2(id int)") + job = helper.DDL2Job("create table test2.t2(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test2.t3(id int)") + job = helper.DDL2Job("create table test2.t3(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) -======= - job = helper.DDL2Job("create table test2.t1(id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) - waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - - job = helper.DDL2Job("create table test2.t2(id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) - waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - - job = helper.DDL2Job("create table test2.t3(id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) ->>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) job = helper.DDL2Job("rename table test2.t1 to test2.t11, test2.t2 to test2.t22, test2.t3 to test2.t33") @@ -323,27 +285,15 @@ func TestHandleRenameTable(t *testing.T) { mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) -<<<<<<< HEAD - job = helper.DDL2Job("create table Test3.t1(id int)") + job = helper.DDL2Job("create table Test3.t1(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) // skip this table - job = helper.DDL2Job("create table Test3.t2(id int)") + job = helper.DDL2Job("create table Test3.t2(id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) -======= - job = helper.DDL2Job("create table Test3.t1(id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) - waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - - // skip this table - job = helper.DDL2Job("create table Test3.t2(id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) ->>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) job = helper.DDL2Job("rename table Test3.t1 to Test3.t11, Test3.t2 to Test3.t22") @@ -357,69 +307,36 @@ func TestHandleRenameTable(t *testing.T) { // test rename table { -<<<<<<< HEAD - job := helper.DDL2Job("create table test1.t99 (id int)") + job := helper.DDL2Job("create table test1.t99 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) // this ddl should be skipped - job = helper.DDL2Job("create table test1.t1000 (id int)") + job = helper.DDL2Job("create table test1.t1000 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) // this ddl should be skipped - job = helper.DDL2Job("create table test1.t888 (id int)") + job = helper.DDL2Job("create table test1.t888 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t20230808 (id int)") + job = helper.DDL2Job("create table test1.t20230808 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t202308081 (id int)") + job = helper.DDL2Job("create table test1.t202308081 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - job = helper.DDL2Job("create table test1.t202308082 (id int)") + job = helper.DDL2Job("create table test1.t202308082 (id int primary key)") mockPuller.appendDDL(job) mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1) -======= - job := helper.DDL2Job("create table test1.t99 (id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) - waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - - // this ddl should be skipped - job = helper.DDL2Job("create table test1.t1000 (id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) - waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - - // this ddl should be skipped - job = helper.DDL2Job("create table test1.t888 (id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) - waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - - job = helper.DDL2Job("create table test1.t20230808 (id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) - waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - - job = helper.DDL2Job("create table test1.t202308081 (id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) - waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) - - job = helper.DDL2Job("create table test1.t202308082 (id int primary key)") - inputDDL(t, ddlJobPullerImpl, job) - inputTs(t, ddlJobPullerImpl, job.BinlogInfo.FinishedTS+1) ->>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)) waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1) // since test1.99 in filter rule, we replicate it job = helper.DDL2Job("rename table test1.t99 to test1.t999") @@ -884,12 +801,12 @@ func waitResolvedTsGrowing(t *testing.T, p DDLPuller, targetTs model.Ts) { }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(200)) require.Nil(t, err) } - func TestCcheckIneligibleTableDDL(t *testing.T) { - ddlJobPuller, helper := newMockDDLJobPuller(t, true) + startTs := uint64(10) + mockPuller := newMockPuller(t, startTs) + ddlJobPuller, helper := newMockDDLJobPuller(t, mockPuller, true) defer helper.Close() - startTs := uint64(10) ddlJobPullerImpl := ddlJobPuller.(*ddlJobPullerImpl) ddlJobPullerImpl.setResolvedTs(startTs) From d8a9ec1914dda905121f3aa164e6bfce8862249e Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 10 Jun 2024 19:52:48 +0800 Subject: [PATCH 3/3] fix verify Signed-off-by: dongmen <414110582@qq.com> --- cdc/puller/ddl_puller_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/puller/ddl_puller_test.go b/cdc/puller/ddl_puller_test.go index d5d6677dc16..b5a3ffa719c 100644 --- a/cdc/puller/ddl_puller_test.go +++ b/cdc/puller/ddl_puller_test.go @@ -801,6 +801,7 @@ func waitResolvedTsGrowing(t *testing.T, p DDLPuller, targetTs model.Ts) { }, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(200)) require.Nil(t, err) } + func TestCcheckIneligibleTableDDL(t *testing.T) { startTs := uint64(10) mockPuller := newMockPuller(t, startTs)