Skip to content

Commit

Permalink
ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965)
Browse files Browse the repository at this point in the history
close #10890
  • Loading branch information
asddongmen authored Apr 25, 2024
1 parent 4fd0842 commit d0329d7
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 60 deletions.
17 changes: 0 additions & 17 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,24 +223,7 @@ func (m *ddlManager) tick(
}

for _, event := range events {
// TODO: find a better place to do this check
// check if the ddl event is belong to an ineligible table.
// If so, we should ignore it.
if !filter.IsSchemaDDL(event.Type) {
ignore, err := m.schema.
IsIneligibleTable(ctx, event.TableInfo.TableName.TableID, event.CommitTs)
if err != nil {
return nil, nil, errors.Trace(err)
}
if ignore {
log.Warn("ignore the DDL event of ineligible table",
zap.String("changefeed", m.changfeedID.ID), zap.Any("ddl", event))
continue
}
}

tableName := event.TableInfo.TableName
// Add all valid DDL events to the pendingDDLs.
m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event)
}

Expand Down
65 changes: 54 additions & 11 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package puller
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
Expand All @@ -26,6 +27,7 @@ import (
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/kv/sharedconn"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -204,8 +206,7 @@ func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model
if job != nil {
skip, err := p.handleJob(job)
if err != nil {
return cerror.WrapError(cerror.ErrHandleDDLFailed,
err, job.Query, job.StartTS, job.StartTS)
return err
}
if skip {
return nil
Expand Down Expand Up @@ -353,7 +354,8 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) {
return true, nil
}
return true, errors.Trace(err)
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
}

switch job.Type {
Expand All @@ -369,15 +371,16 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Error(err))
return true, errors.Trace(err)
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
}
case timodel.ActionRenameTable:
oldTable, ok := snap.PhysicalTableByID(job.TableID)
if !ok {
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
if !discard {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
log.Warn("skip rename table ddl since cannot found the old table info",
zap.String("namespace", p.changefeedID.Namespace),
Expand All @@ -394,16 +397,16 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table)
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
if err != nil {
return true, errors.Trace(err)
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
}
// 3. If its old table name is not in filter rule, and its new table name in filter rule, return error.
if skipByOldTableName {
if !skipByNewTableName {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
return true, nil
}

log.Info("ddl puller receive rename table ddl job",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
Expand All @@ -426,12 +429,52 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {

err = p.schemaStorage.HandleDDLJob(job)
if err != nil {
return true, errors.Trace(err)
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
}

p.setResolvedTs(job.BinlogInfo.FinishedTS)
p.schemaVersion = job.BinlogInfo.SchemaVersion
return false, nil

return p.checkIneligibleTableDDL(snap, job)
}

// checkIneligibleTableDDL checks if the table is ineligible before and after the DDL.
// 1. If it is not a table DDL, we shouldn't check it.
// 2. If the table after the DDL is ineligible:
// a. If the table is not exist before the DDL, we should ignore the DDL.
// b. If the table is ineligible before the DDL, we should ignore the DDL.
// c. If the table is eligible before the DDL, we should return an error.
func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot, job *timodel.Job) (skip bool, err error) {
if filter.IsSchemaDDL(job.Type) {
return false, nil
}

ineligible := p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(job.TableID)
if !ineligible {
return false, nil
}

// If the table is not in the snapshot before the DDL,
// we should ignore the DDL.
_, exist := snapBefore.PhysicalTableByID(job.TableID)
if !exist {
return true, nil
}

// If the table after the DDL is ineligible, we should check if it is not ineligible before the DDL.
// If so, we should return an error to inform the user that it is a
// dangerous operation and should be handled manually.
isBeforeineligible := snapBefore.IsIneligibleTableID(job.TableID)
if isBeforeineligible {
log.Warn("ignore the DDL event of ineligible table",
zap.String("changefeed", p.changefeedID.ID), zap.Any("ddl", job))
return true, nil
}
return false, cerror.New(fmt.Sprintf("An eligible table become ineligible after DDL: [%s] "+
"it is a dangerous operation and may cause data loss. If you want to replicate this ddl safely, "+
"pelase pause the changefeed and update the `force-replicate=true` "+
"in the changefeed configuration, "+
"then resume the changefeed.", job.Query))
}

// handleRenameTables gets all the tables that are renamed
Expand Down
Loading

0 comments on commit d0329d7

Please sign in to comment.