Skip to content

Commit

Permalink
ddl: roll back txn before handling ErrEntryTooLarge (#43867) (#43878)
Browse files Browse the repository at this point in the history
close #43725
  • Loading branch information
ti-chi-bot authored May 26, 2023
1 parent c8f7346 commit 66e6d63
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
1 change: 1 addition & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ func TestDDLJobErrorCount(t *testing.T) {
require.NotNil(t, historyJob)
require.Equal(t, int64(1), historyJob.ErrorCount)
require.True(t, kv.ErrEntryTooLarge.Equal(historyJob.Error))
tk.MustQuery("select * from ddl_error_table;").Check(testkit.Rows())
}

func TestCommitTxnWithIndexChange(t *testing.T) {
Expand Down
19 changes: 16 additions & 3 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,16 @@ func (w *worker) getFirstDDLJob(t *meta.Meta) (*model.Job, error) {
}

// handleUpdateJobError handles the too large DDL job.
func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) error {
func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error, txn kv.Transaction) error {
if err == nil {
return nil
}
if kv.ErrEntryTooLarge.Equal(err) {
logutil.Logger(w.logCtx).Warn("[ddl] update DDL job failed", zap.String("job", job.String()), zap.Error(err))
err1 := w.rollbackOrReset(txn)
if err1 != nil {
return errors.Trace(err1)
}
// Reduce this txn entry size.
job.BinlogInfo.Clean()
job.Error = toTError(err)
Expand All @@ -468,6 +472,15 @@ func (w *worker) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) e
return errors.Trace(err)
}

func (w *worker) rollbackOrReset(txn kv.Transaction) error {
if w.concurrentDDL {
w.sess.rollback()
return w.sess.begin()
}
txn.Reset()
return nil
}

// updateDDLJob updates the DDL job information.
// Every time we enter another state except final state, we must call this function.
func (w *worker) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error {
Expand Down Expand Up @@ -865,7 +878,7 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
return 0, err
}
err = w.updateDDLJob(t, job, runJobErr != nil)
if err = w.handleUpdateJobError(t, job, err); err != nil {
if err = w.handleUpdateJobError(t, job, err, txn); err != nil {
w.sess.rollback()
d.unlockSchemaVersion(job.ID)
return 0, err
Expand Down Expand Up @@ -1004,7 +1017,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
schemaVer = 0
}
err = w.updateDDLJob(t, job, runJobErr != nil)
if err = w.handleUpdateJobError(t, job, err); err != nil {
if err = w.handleUpdateJobError(t, job, err, txn); err != nil {
return errors.Trace(err)
}
writeBinlog(d.binlogCli, txn, job)
Expand Down

0 comments on commit 66e6d63

Please sign in to comment.