Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: fix new owner updating checkpoint too early with pending DDL (#2252) #2290

Merged
8 changes: 7 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs)
// Since we wait for checkpoint == ddlJob.FinishTs before executing the DDL,
// when there is a recovery, there is no guarantee that the DDL at the checkpoint
// has been executed. So we need to start the DDL puller from (checkpoint-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -317,6 +320,9 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
case ddlJobBarrier:
ddlResolvedTs, ddlJob := c.ddlPuller.FrontDDL()
if ddlJob == nil || ddlResolvedTs != barrierTs {
if ddlResolvedTs < barrierTs {
return barrierTs, nil
}
c.barriers.Update(ddlJobBarrier, ddlResolvedTs)
return barrierTs, nil
}
Expand Down
5 changes: 4 additions & 1 deletion cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con
return nil, errors.Trace(err)
}
}
// We do a snapshot read of the metadata from TiKV at (startTs-1) instead of startTs,
// because the DDL puller might send a DDL at startTs, which would cause schema conflicts if
// the DDL's result is already contained in the snapshot.
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs-1, config.ForceReplicate)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -58,7 +61,7 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con
schemaSnapshot: schemaSnap,
filter: f,
config: config,
ddlHandledTs: startTs - 1,
ddlHandledTs: startTs,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion tests/kill_owner_with_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ function run() {

for i in $(seq 1 3); do
kill_cdc_and_restart $pd_addr $WORK_DIR $CDC_BINARY
sleep 2
sleep 8
done

export GO_FAILPOINTS=''
Expand Down