Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10965
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Apr 25, 2024
1 parent 986ebf7 commit 876755f
Show file tree
Hide file tree
Showing 3 changed files with 407 additions and 17 deletions.
27 changes: 27 additions & 0 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func (m *ddlManager) tick(
break
}

<<<<<<< HEAD
if job != nil && job.BinlogInfo != nil {
log.Info("handle a ddl job",
zap.String("namespace", m.changfeedID.Namespace),
Expand All @@ -223,6 +224,32 @@ func (m *ddlManager) tick(
return nil, nil, err
}

=======
if job.BinlogInfo == nil {
continue
}

log.Info("handle a ddl job",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("changefeed", m.changfeedID.ID),
zap.Int64("tableID", job.TableID),
zap.Int64("jobID", job.ID),
zap.String("query", job.Query),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
)
events, err := m.schema.BuildDDLEvents(ctx, job)
if err != nil {
return nil, nil, err
}

for _, event := range events {
tableName := event.TableInfo.TableName
m.pendingDDLs[tableName] = append(m.pendingDDLs[tableName], event)
}

// Send DDL events to redo log.
if m.redoDDLManager.Enabled() {
>>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965))
for _, event := range events {
// TODO: find a better place to do this check
// check if the ddl event is belong to an ineligible table.
Expand Down
221 changes: 221 additions & 0 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package puller
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sorter/memory"
Expand Down Expand Up @@ -156,6 +158,85 @@ func (p *ddlJobPullerImpl) Output() <-chan *model.DDLJobEntry {
return p.outputCh
}

<<<<<<< HEAD
=======
// Input receives the raw kv entry and put it into the input channel.
func (p *ddlJobPullerImpl) Input(
ctx context.Context,
rawDDL *model.RawKVEntry,
_ []tablepb.Span,
) error {
p.sorter.AddEntry(ctx, model.NewPolymorphicEvent(rawDDL))
return nil
}

// handleRawKVEntry converts the raw kv entry to DDL job and sends it to the output channel.
func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model.RawKVEntry) error {
if ddlRawKV == nil {
return nil
}

if ddlRawKV.OpType == model.OpTypeResolved {
// Only nil in unit test case.
if p.schemaStorage != nil {
p.schemaStorage.AdvanceResolvedTs(ddlRawKV.CRTs)
}
if ddlRawKV.CRTs > p.getResolvedTs() {
p.setResolvedTs(ddlRawKV.CRTs)
}
}

job, err := p.unmarshalDDL(ddlRawKV)
if err != nil {
return errors.Trace(err)
}

if job != nil {
skip, err := p.handleJob(job)
if err != nil {
return err
}
if skip {
return nil
}
log.Info("a new ddl job is received",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
zap.String("query", job.Query),
zap.Any("job", job))
}

jobEntry := &model.DDLJobEntry{
Job: job,
OpType: ddlRawKV.OpType,
CRTs: ddlRawKV.CRTs,
}
select {
case <-ctx.Done():
return ctx.Err()
case p.outputCh <- jobEntry:
}
return nil
}

func (p *ddlJobPullerImpl) unmarshalDDL(rawKV *model.RawKVEntry) (*timodel.Job, error) {
if rawKV.OpType != model.OpTypePut {
return nil, nil
}
if p.ddlJobsTable == nil && !entry.IsLegacyFormatJob(rawKV) {
err := p.initJobTableMeta()
if err != nil {
return nil, errors.Trace(err)
}
}
return entry.ParseDDLJob(p.ddlJobsTable, rawKV, p.jobMetaColumnID)
}

>>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965))
func (p *ddlJobPullerImpl) getResolvedTs() uint64 {
return atomic.LoadUint64(&p.resolvedTs)
}
Expand Down Expand Up @@ -212,8 +293,148 @@ func (p *ddlJobPullerImpl) unmarshalDDL(rawKV *model.RawKVEntry) (*timodel.Job,
if err != nil {
return nil, errors.Trace(err)
}
<<<<<<< HEAD
}
return entry.ParseDDLJob(p.ddlJobsTable, rawKV, p.jobMetaColumnID)
=======
}()

snap := p.schemaStorage.GetLastSnapshot()
if err = snap.FillSchemaName(job); err != nil {
log.Info("failed to fill schema name for ddl job",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Error(err))
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) {
return true, nil
}
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
}

switch job.Type {
case timodel.ActionRenameTables:
skip, err = p.handleRenameTables(job)
if err != nil {
log.Warn("handle rename tables ddl job failed",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
zap.Error(err))
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
}
case timodel.ActionRenameTable:
oldTable, ok := snap.PhysicalTableByID(job.TableID)
if !ok {
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
if !discard {
return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
log.Warn("skip rename table ddl since cannot found the old table info",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", job.TableID),
zap.Int64("newSchemaID", job.SchemaID),
zap.String("newSchemaName", job.SchemaName),
zap.String("oldTableName", job.BinlogInfo.TableInfo.Name.O),
zap.String("newTableName", job.TableName))
return true, nil
}
// since we can find the old table, it must be able to find the old schema.
// 2. If we can find the preTableInfo, we filter it by the old table name.
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table)
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
if err != nil {
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
}
// 3. If its old table name is not in filter rule, and its new table name in filter rule, return error.
if skipByOldTableName {
if !skipByNewTableName {
return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
return true, nil
}
log.Info("ddl puller receive rename table ddl job",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.Uint64("startTs", job.StartTS),
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS))
default:
// nil means it is a schema ddl job, it's no need to fill the table name.
if job.BinlogInfo.TableInfo != nil {
job.TableName = job.BinlogInfo.TableInfo.Name.O
}
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName)
}

if skip {
return true, nil
}

err = p.schemaStorage.HandleDDLJob(job)
if err != nil {
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
}
p.setResolvedTs(job.BinlogInfo.FinishedTS)
p.schemaVersion = job.BinlogInfo.SchemaVersion

return p.checkIneligibleTableDDL(snap, job)
}

// checkIneligibleTableDDL checks if the table is ineligible before and after the DDL.
// 1. If it is not a table DDL, we shouldn't check it.
// 2. If the table after the DDL is ineligible:
// a. If the table is not exist before the DDL, we should ignore the DDL.
// b. If the table is ineligible before the DDL, we should ignore the DDL.
// c. If the table is eligible before the DDL, we should return an error.
func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot, job *timodel.Job) (skip bool, err error) {
if filter.IsSchemaDDL(job.Type) {
return false, nil
}

ineligible := p.schemaStorage.GetLastSnapshot().IsIneligibleTableID(job.TableID)
if !ineligible {
return false, nil
}

// If the table is not in the snapshot before the DDL,
// we should ignore the DDL.
_, exist := snapBefore.PhysicalTableByID(job.TableID)
if !exist {
return true, nil
}

// If the table after the DDL is ineligible, we should check if it is not ineligible before the DDL.
// If so, we should return an error to inform the user that it is a
// dangerous operation and should be handled manually.
isBeforeineligible := snapBefore.IsIneligibleTableID(job.TableID)
if isBeforeineligible {
log.Warn("ignore the DDL event of ineligible table",
zap.String("changefeed", p.changefeedID.ID), zap.Any("ddl", job))
return true, nil
}
return false, cerror.New(fmt.Sprintf("An eligible table become ineligible after DDL: [%s] "+
"it is a dangerous operation and may cause data loss. If you want to replicate this ddl safely, "+
"pelase pause the changefeed and update the `force-replicate=true` "+
"in the changefeed configuration, "+
"then resume the changefeed.", job.Query))
>>>>>>> d0329d7f1c (ddl_puller (ticdc): handle dorp pk/uk ddl correctly (#10965))
}

// handleRenameTables gets all the tables that are renamed
Expand Down
Loading

0 comments on commit 876755f

Please sign in to comment.