From 4cfe424151eb407de908bb18ad2acb2e4370a4cf Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Mon, 12 Aug 2024 10:41:09 +0800 Subject: [PATCH] This is an automated cherry-pick of #11476 Signed-off-by: ti-chi-bot --- cdc/owner/ddl_manager.go | 15 +- cdc/sink/ddlsink/mysql/async_ddl.go | 186 ++++++++++++++++++ cdc/sink/ddlsink/mysql/async_ddl_test.go | 178 +++++++++++++++++ cdc/sink/ddlsink/mysql/mysql_ddl_sink.go | 97 ++++----- cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go | 5 +- pkg/errors/reexport.go | 2 + 6 files changed, 416 insertions(+), 67 deletions(-) create mode 100644 cdc/sink/ddlsink/mysql/async_ddl.go create mode 100644 cdc/sink/ddlsink/mysql/async_ddl_test.go diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index a99fec7eead..051729fd473 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -302,6 +302,13 @@ func (m *ddlManager) tick( zap.Uint64("commitTs", nextDDL.CommitTs), zap.Uint64("checkpointTs", m.checkpointTs)) m.executingDDL = nextDDL + skip, cleanMsg, err := m.shouldSkipDDL(m.executingDDL) + if err != nil { + return nil, nil, errors.Trace(err) + } + if skip { + m.cleanCache(cleanMsg) + } } err := m.executeDDL(ctx) if err != nil { @@ -367,14 +374,6 @@ func (m *ddlManager) executeDDL(ctx context.Context) error { if m.executingDDL == nil { return nil } - skip, cleanMsg, err := m.shouldSkipDDL(m.executingDDL) - if err != nil { - return errors.Trace(err) - } - if skip { - m.cleanCache(cleanMsg) - return nil - } failpoint.Inject("ExecuteNotDone", func() { // This ddl will never finish executing. diff --git a/cdc/sink/ddlsink/mysql/async_ddl.go b/cdc/sink/ddlsink/mysql/async_ddl.go new file mode 100644 index 00000000000..1692a394f37 --- /dev/null +++ b/cdc/sink/ddlsink/mysql/async_ddl.go @@ -0,0 +1,186 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/pingcap/log" + timodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" +) + +var checkRunningAddIndexSQL = ` +SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, STATE, QUERY +FROM information_schema.ddl_jobs +WHERE DB_NAME = "%s" + AND TABLE_NAME = "%s" + AND JOB_TYPE LIKE "add index%%" + AND (STATE = "running" OR STATE = "queueing") +LIMIT 1; +` + +func (m *DDLSink) shouldAsyncExecDDL(ddl *model.DDLEvent) bool { + return m.cfg.IsTiDB && ddl.Type == timodel.ActionAddIndex +} + +// asyncExecDDL executes ddl in async mode. +// this function only works in TiDB, because TiDB will save ddl jobs +// and execute them asynchronously even if ticdc crashed. +func (m *DDLSink) asyncExecDDL(ctx context.Context, ddl *model.DDLEvent) error { + done := make(chan error, 1) + // Use a longer timeout to ensure the add index ddl is sent to tidb before executing the next ddl. + tick := time.NewTimer(10 * time.Second) + defer tick.Stop() + log.Info("async exec add index ddl start", + zap.String("changefeedID", m.id.String()), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + go func() { + if err := m.execDDLWithMaxRetries(ctx, ddl); err != nil { + log.Error("async exec add index ddl failed", + zap.String("changefeedID", m.id.String()), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + done <- err + return + } + log.Info("async exec add index ddl done", + zap.String("changefeedID", m.id.String()), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + done <- nil + }() + + select { + case <-ctx.Done(): + // if the ddl is canceled, we just return nil, if the ddl is not received by tidb, + // the downstream ddl is lost, because the checkpoint ts is forwarded. + log.Info("async add index ddl exits as canceled", + zap.String("changefeedID", m.id.String()), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + return nil + case err := <-done: + // if the ddl is executed within 2 seconds, we just return the result to the caller. + return err + case <-tick.C: + // if the ddl is still running, we just return nil, + // then if the ddl is failed, the downstream ddl is lost. + // because the checkpoint ts is forwarded. + log.Info("async add index ddl is still running", + zap.String("changefeedID", m.id.String()), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + return nil + } +} + +// Should always wait for async ddl done before executing the next ddl. +func (m *DDLSink) waitAsynExecDone(ctx context.Context, ddl *model.DDLEvent) { + if !m.cfg.IsTiDB { + return + } + + tables := make(map[model.TableName]struct{}) + if ddl.TableInfo != nil { + tables[ddl.TableInfo.TableName] = struct{}{} + } + if ddl.PreTableInfo != nil { + tables[ddl.PreTableInfo.TableName] = struct{}{} + } + if len(tables) == 0 || m.checkAsyncExecDDLDone(ctx, tables) { + return + } + + log.Debug("wait async exec ddl done", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.Any("tables", tables), + zap.Uint64("commitTs", ddl.CommitTs), + zap.String("ddl", ddl.Query)) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + done := m.checkAsyncExecDDLDone(ctx, tables) + if done { + return + } + } + } +} + +func (m *DDLSink) checkAsyncExecDDLDone(ctx context.Context, tables map[model.TableName]struct{}) bool { + for table := range tables { + done := m.doCheck(ctx, table) + if !done { + return false + } + } + return true +} + +func (m *DDLSink) doCheck(ctx context.Context, table model.TableName) (done bool) { + if v, ok := m.lastExecutedNormalDDLCache.Get(table); ok { + ddlType := v.(timodel.ActionType) + if ddlType == timodel.ActionAddIndex { + log.Panic("invalid ddl type in lastExecutedNormalDDLCache", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.String("ddlType", ddlType.String())) + } + return true + } + + ret := m.db.QueryRowContext(ctx, fmt.Sprintf(checkRunningAddIndexSQL, table.Schema, table.Table)) + if ret.Err() != nil { + log.Error("check async exec ddl failed", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.Error(ret.Err())) + return true + } + var jobID, jobType, schemaState, schemaID, tableID, state, query string + if err := ret.Scan(&jobID, &jobType, &schemaState, &schemaID, &tableID, &state, &query); err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.Error("check async exec ddl failed", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.Error(err)) + } + return true + } + + log.Info("async ddl is still running", + zap.String("namespace", m.id.Namespace), + zap.String("changefeed", m.id.ID), + zap.String("table", table.String()), + zap.String("jobID", jobID), + zap.String("jobType", jobType), + zap.String("schemaState", schemaState), + zap.String("schemaID", schemaID), + zap.String("tableID", tableID), + zap.String("state", state), + zap.String("query", query)) + return false +} diff --git a/cdc/sink/ddlsink/mysql/async_ddl_test.go b/cdc/sink/ddlsink/mysql/async_ddl_test.go new file mode 100644 index 00000000000..91a6b21a3e3 --- /dev/null +++ b/cdc/sink/ddlsink/mysql/async_ddl_test.go @@ -0,0 +1,178 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "context" + "database/sql" + "errors" + "fmt" + "net/url" + "sync/atomic" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + dmysql "github.com/go-sql-driver/mysql" + timodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" + "github.com/stretchr/testify/require" +) + +func TestWaitAsynExecDone(t *testing.T) { + var dbIndex int32 = 0 + GetDBConnImpl = func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + atomic.AddInt32(&dbIndex, 1) + }() + if atomic.LoadInt32(&dbIndex) == 0 { + // test db + db, err := pmysql.MockTestDB() + require.Nil(t, err) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.Nil(t, err) + mock.ExpectQuery("select tidb_version()"). + WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b")) + mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + + // Case 1: there is a running add index job + mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnRows( + sqlmock.NewRows([]string{"JOB_ID", "JOB_TYPE", "SCHEMA_STATE", "SCHEMA_ID", "TABLE_ID", "STATE", "QUERY"}). + AddRow("1", "add index", "running", "1", "1", "running", "Create index idx1 on test.sbtest0(a)"), + ) + // Case 2: there is no running add index job + // Case 3: no permission to query ddl_jobs, TiDB will return empty result + mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnRows( + sqlmock.NewRows(nil), + ) + // Case 4: query ddl_jobs failed + mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnError( + errors.New("mock error"), + ) + + mock.ExpectClose() + return db, nil + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sinkURI, err := url.Parse("mysql://root:@127.0.0.1:4000") + require.NoError(t, err) + replicateCfg := config.GetDefaultReplicaConfig() + ddlSink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateCfg) + require.NoError(t, err) + + table := model.TableName{Schema: "test", Table: "sbtest0"} + tables := make(map[model.TableName]struct{}) + tables[table] = struct{}{} + + // Test fast path, ddlSink.lastExecutedNormalDDLCache meet panic + ddlSink.lastExecutedNormalDDLCache.Add(table, timodel.ActionAddIndex) + require.Panics(t, func() { + ddlSink.checkAsyncExecDDLDone(ctx, tables) + }) + + // Test fast path, ddlSink.lastExecutedNormalDDLCache is hit + ddlSink.lastExecutedNormalDDLCache.Add(table, timodel.ActionCreateTable) + done := ddlSink.checkAsyncExecDDLDone(ctx, tables) + require.True(t, done) + + // Clenup the cache, always check the async running state + ddlSink.lastExecutedNormalDDLCache.Remove(table) + + // Test has running async ddl job + done = ddlSink.checkAsyncExecDDLDone(ctx, tables) + require.False(t, done) + + // Test no running async ddl job + done = ddlSink.checkAsyncExecDDLDone(ctx, tables) + require.True(t, done) + + // Test ignore error + done = ddlSink.checkAsyncExecDDLDone(ctx, tables) + require.True(t, done) + + ddlSink.Close() +} + +func TestAsyncExecAddIndex(t *testing.T) { + ddlExecutionTime := time.Second * 15 + var dbIndex int32 = 0 + GetDBConnImpl = func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + atomic.AddInt32(&dbIndex, 1) + }() + if atomic.LoadInt32(&dbIndex) == 0 { + // test db + db, err := pmysql.MockTestDB() + require.Nil(t, err) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.Nil(t, err) + mock.ExpectQuery("select tidb_version()"). + WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b")) + mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{ + Number: 1305, + Message: "FUNCTION test.tidb_version does not exist", + }) + mock.ExpectBegin() + mock.ExpectExec("USE `test`;"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("Create index idx1 on test.t1(a)"). + WillDelayFor(ddlExecutionTime). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + mock.ExpectClose() + return db, nil + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + changefeed := "test-changefeed" + sinkURI, err := url.Parse("mysql://127.0.0.1:4000") + require.Nil(t, err) + rc := config.GetDefaultReplicaConfig() + sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID(changefeed), sinkURI, rc) + + require.Nil(t, err) + + ddl1 := &model.DDLEvent{ + StartTs: 1000, + CommitTs: 1010, + TableInfo: &model.TableInfo{ + TableName: model.TableName{ + Schema: "test", + Table: "t1", + }, + }, + Type: timodel.ActionAddIndex, + Query: "Create index idx1 on test.t1(a)", + } + start := time.Now() + err = sink.WriteDDLEvent(ctx, ddl1) + require.Nil(t, err) + require.True(t, time.Since(start) < ddlExecutionTime) + require.True(t, time.Since(start) >= 10*time.Second) + sink.Close() +} diff --git a/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go index f520ab979dd..2a127e4af49 100644 --- a/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go +++ b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go @@ -20,7 +20,7 @@ import ( "net/url" "time" - cerrors "github.com/pingcap/errors" + lru "github.com/hashicorp/golang-lru" "github.com/pingcap/failpoint" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/parser/model" @@ -29,7 +29,11 @@ import ( "github.com/pingcap/tiflow/cdc/sink/ddlsink" "github.com/pingcap/tiflow/cdc/sink/metrics" "github.com/pingcap/tiflow/pkg/config" +<<<<<<< HEAD cerror "github.com/pingcap/tiflow/pkg/errors" +======= + "github.com/pingcap/tiflow/pkg/errors" +>>>>>>> 1e3766ea7d (sink, ddl(ticdc): support add index ddl in downstream (#11476)) "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/quotes" "github.com/pingcap/tiflow/pkg/retry" @@ -62,6 +66,11 @@ type DDLSink struct { // statistics is the statistics of this sink. // We use it to record the DDL count. statistics *metrics.Statistics + + // lastExecutedNormalDDLCache is a fast path to check whether aync DDL of a table + // is running in downstream. + // map: model.TableName -> timodel.ActionType + lastExecutedNormalDDLCache *lru.Cache } // NewDDLSink creates a new DDLSink. @@ -92,11 +101,29 @@ func NewDDLSink( return nil, err } +<<<<<<< HEAD m := &DDLSink{ id: changefeedID, db: db, cfg: cfg, statistics: metrics.NewStatistics(ctx, sink.TxnSink), +======= + cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, db) + if err != nil { + return nil, err + } + + lruCache, err := lru.New(1024) + if err != nil { + return nil, err + } + m := &DDLSink{ + id: changefeedID, + db: db, + cfg: cfg, + statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink), + lastExecutedNormalDDLCache: lruCache, +>>>>>>> 1e3766ea7d (sink, ddl(ticdc): support add index ddl in downstream (#11476)) } log.Info("MySQL DDL sink is created", @@ -107,10 +134,18 @@ func NewDDLSink( // WriteDDLEvent writes a DDL event to the mysql database. func (m *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { - if ddl.Type == timodel.ActionAddIndex && m.cfg.IsTiDB { - return m.asyncExecAddIndexDDLIfTimeout(ctx, ddl) + m.waitAsynExecDone(ctx, ddl) + + if m.shouldAsyncExecDDL(ddl) { + m.lastExecutedNormalDDLCache.Remove(ddl.TableInfo.TableName) + return m.asyncExecDDL(ctx, ddl) } - return m.execDDLWithMaxRetries(ctx, ddl) + + if err := m.execDDLWithMaxRetries(ctx, ddl); err != nil { + return errors.Trace(err) + } + m.lastExecutedNormalDDLCache.Add(ddl.TableInfo.TableName, ddl.Type) + return nil } func (m *DDLSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error { @@ -191,7 +226,7 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error { zap.Duration("duration", time.Since(start)), zap.String("namespace", m.id.Namespace), zap.String("changefeed", m.id.ID), zap.Error(err)) - return cerror.WrapError(cerror.ErrMySQLTxnError, cerrors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query))) + return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", ddl.Query))) } log.Info("Exec DDL succeeded", zap.String("sql", ddl.Query), @@ -231,55 +266,3 @@ func (m *DDLSink) Close() { } } } - -// asyncExecAddIndexDDLIfTimeout executes ddl in async mode. -// this function only works in TiDB, because TiDB will save ddl jobs -// and execute them asynchronously even if ticdc crashed. -func (m *DDLSink) asyncExecAddIndexDDLIfTimeout(ctx context.Context, ddl *model.DDLEvent) error { - done := make(chan error, 1) - // wait for 2 seconds at most - tick := time.NewTimer(2 * time.Second) - defer tick.Stop() - log.Info("async exec add index ddl start", - zap.String("changefeedID", m.id.String()), - zap.Uint64("commitTs", ddl.CommitTs), - zap.String("ddl", ddl.Query)) - go func() { - if err := m.execDDLWithMaxRetries(ctx, ddl); err != nil { - log.Error("async exec add index ddl failed", - zap.String("changefeedID", m.id.String()), - zap.Uint64("commitTs", ddl.CommitTs), - zap.String("ddl", ddl.Query)) - done <- err - return - } - log.Info("async exec add index ddl done", - zap.String("changefeedID", m.id.String()), - zap.Uint64("commitTs", ddl.CommitTs), - zap.String("ddl", ddl.Query)) - done <- nil - }() - - select { - case <-ctx.Done(): - // if the ddl is canceled, we just return nil, if the ddl is not received by tidb, - // the downstream ddl is lost, because the checkpoint ts is forwarded. - log.Info("async add index ddl exits as canceled", - zap.String("changefeedID", m.id.String()), - zap.Uint64("commitTs", ddl.CommitTs), - zap.String("ddl", ddl.Query)) - return nil - case err := <-done: - // if the ddl is executed within 2 seconds, we just return the result to the caller. - return err - case <-tick.C: - // if the ddl is still running, we just return nil, - // then if the ddl is failed, the downstream ddl is lost. - // because the checkpoint ts is forwarded. - log.Info("async add index ddl is still running", - zap.String("changefeedID", m.id.String()), - zap.Uint64("commitTs", ddl.CommitTs), - zap.String("ddl", ddl.Query)) - return nil - } -} diff --git a/cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go b/cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go index 8c5575b025b..61cdf277b7e 100644 --- a/cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go +++ b/cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go @@ -17,9 +17,7 @@ import ( "context" "database/sql" "net/url" - "sync/atomic" "testing" - "time" "github.com/DATA-DOG/go-sqlmock" dmysql "github.com/go-sql-driver/mysql" @@ -146,6 +144,7 @@ func TestNeedSwitchDB(t *testing.T) { require.Equal(t, tc.needSwitch, needSwitchDB(tc.ddl)) } } +<<<<<<< HEAD func TestAsyncExecAddIndex(t *testing.T) { ddlExecutionTime := time.Millisecond * 3000 @@ -204,3 +203,5 @@ func TestAsyncExecAddIndex(t *testing.T) { require.True(t, time.Since(start) >= 2*time.Second) sink.Close() } +======= +>>>>>>> 1e3766ea7d (sink, ddl(ticdc): support add index ddl in downstream (#11476)) diff --git a/pkg/errors/reexport.go b/pkg/errors/reexport.go index 5204382c134..b619fb3878a 100644 --- a/pkg/errors/reexport.go +++ b/pkg/errors/reexport.go @@ -36,4 +36,6 @@ var ( Annotate = perrors.Annotate // Annotatef is a shortcut for github.com/pingcap/errors.Annotatef. Annotatef = perrors.Annotatef + // WithMessage is a shortcut for github.com/pingcap/errors.WithMessage. + WithMessage = perrors.WithMessage )