Skip to content

Commit

Permalink
sink, ddl(ticdc): support add index ddl in downstream (pingcap#11476)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Aug 13, 2024
1 parent 2a47e10 commit f26d336
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 170 deletions.
2 changes: 1 addition & 1 deletion cdc/sinkv2/ddlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func New(
case sink.BlackHoleScheme:
return blackhole.New(), nil
case sink.MySQLSSLScheme, sink.MySQLScheme, sink.TiDBScheme, sink.TiDBSSLScheme:
return mysql.NewMySQLDDLSink(ctx, sinkURI, cfg)
return mysql.NewDDLSink(ctx, sinkURI, cfg)
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
return cloudstorage.NewDDLSink(ctx, sinkURI, cfg)
default:
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/ddlsink/mysql/async_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"time"

"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/pkg/parser/model"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
Expand Down
22 changes: 6 additions & 16 deletions cdc/sinkv2/ddlsink/mysql/async_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
"time"

"github.com/DATA-DOG/go-sqlmock"
dmysql "github.com/go-sql-driver/mysql"
timodel "github.com/pingcap/tidb/pkg/parser/model"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
Expand All @@ -40,7 +39,7 @@ func TestWaitAsynExecDone(t *testing.T) {
}()
if atomic.LoadInt32(&dbIndex) == 0 {
// test db
db, err := pmysql.MockTestDB()
db, err := pmysql.MockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand All @@ -49,10 +48,6 @@ func TestWaitAsynExecDone(t *testing.T) {
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{
Number: 1305,
Message: "FUNCTION test.tidb_version does not exist",
})

// Case 1: there is a running add index job
mock.ExpectQuery(fmt.Sprintf(checkRunningAddIndexSQL, "test", "sbtest0")).WillReturnRows(
Expand All @@ -78,7 +73,7 @@ func TestWaitAsynExecDone(t *testing.T) {
sinkURI, err := url.Parse("mysql://root:@127.0.0.1:4000")
require.NoError(t, err)
replicateCfg := config.GetDefaultReplicaConfig()
ddlSink, err := NewDDLSink(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateCfg)
ddlSink, err := NewDDLSink(ctx, sinkURI, replicateCfg)
require.NoError(t, err)

table := model.TableName{Schema: "test", Table: "sbtest0"}
Expand Down Expand Up @@ -123,7 +118,7 @@ func TestAsyncExecAddIndex(t *testing.T) {
}()
if atomic.LoadInt32(&dbIndex) == 0 {
// test db
db, err := pmysql.MockTestDB()
db, err := pmysql.MockTestDB(true)
require.Nil(t, err)
return db, nil
}
Expand All @@ -132,10 +127,6 @@ func TestAsyncExecAddIndex(t *testing.T) {
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{
Number: 1305,
Message: "FUNCTION test.tidb_version does not exist",
})
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").
WillReturnResult(sqlmock.NewResult(1, 1))
Expand All @@ -149,11 +140,10 @@ func TestAsyncExecAddIndex(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
changefeed := "test-changefeed"
sinkURI, err := url.Parse("mysql://127.0.0.1:4000")
require.Nil(t, err)
rc := config.GetDefaultReplicaConfig()
sink, err := NewDDLSink(ctx, model.DefaultChangeFeedID(changefeed), sinkURI, rc)
sink, err := NewDDLSink(ctx, sinkURI, rc)

require.Nil(t, err)

Expand All @@ -173,6 +163,6 @@ func TestAsyncExecAddIndex(t *testing.T) {
err = sink.WriteDDLEvent(ctx, ddl1)
require.Nil(t, err)
require.True(t, time.Since(start) < ddlExecutionTime)
require.True(t, time.Since(start) >= 10*time.Second)
require.True(t, time.Since(start) >= 2*time.Second)
sink.Close()
}
100 changes: 11 additions & 89 deletions cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/pkg/config"
<<<<<<< HEAD:cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go
cerror "github.com/pingcap/tiflow/pkg/errors"
=======
"github.com/pingcap/tiflow/pkg/errors"
>>>>>>> 1e3766ea7d (sink, ddl(ticdc): support add index ddl in downstream (#11476)):cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/retry"
Expand All @@ -54,9 +50,9 @@ const (
var GetDBConnImpl pmysql.Factory = pmysql.CreateMySQLDBConn

// Assert Sink implementation
var _ ddlsink.DDLEventSink = (*mysqlDDLSink)(nil)
var _ ddlsink.DDLEventSink = (*DDLSink)(nil)

type mysqlDDLSink struct {
type DDLSink struct {
// id indicates which processor (changefeed) this sink belongs to.
id model.ChangeFeedID
// db is the database connection.
Expand All @@ -72,12 +68,12 @@ type mysqlDDLSink struct {
lastExecutedNormalDDLCache *lru.Cache
}

// NewMySQLDDLSink creates a new mysqlDDLSink.
func NewMySQLDDLSink(
// NewDDLSink creates a new DDLSink.
func NewDDLSink(
ctx context.Context,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
) (*mysqlDDLSink, error) {
) (*DDLSink, error) {
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
cfg := pmysql.NewConfig()
err := cfg.Apply(ctx, changefeedID, sinkURI, replicaConfig)
Expand All @@ -100,18 +96,6 @@ func NewMySQLDDLSink(
return nil, err
}

<<<<<<< HEAD:cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go
m := &mysqlDDLSink{
id: changefeedID,
db: db,
cfg: cfg,
statistics: metrics.NewStatistics(ctx, sink.TxnSink),
=======
cfg.IsWriteSourceExisted, err = pmysql.CheckIfBDRModeIsSupported(ctx, db)
if err != nil {
return nil, err
}

lruCache, err := lru.New(1024)
if err != nil {
return nil, err
Expand All @@ -120,9 +104,8 @@ func NewMySQLDDLSink(
id: changefeedID,
db: db,
cfg: cfg,
statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink),
statistics: metrics.NewStatistics(ctx, sink.TxnSink),
lastExecutedNormalDDLCache: lruCache,
>>>>>>> 1e3766ea7d (sink, ddl(ticdc): support add index ddl in downstream (#11476)):cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
}

log.Info("MySQL DDL sink is created",
Expand All @@ -132,18 +115,12 @@ func NewMySQLDDLSink(
}

// WriteDDLEvent writes a DDL event to the mysql database.
<<<<<<< HEAD:cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go
func (m *mysqlDDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
if ddl.Type == timodel.ActionAddIndex && m.cfg.IsTiDB {
return m.asyncExecAddIndexDDLIfTimeout(ctx, ddl)
=======
func (m *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
m.waitAsynExecDone(ctx, ddl)

if m.shouldAsyncExecDDL(ddl) {
m.lastExecutedNormalDDLCache.Remove(ddl.TableInfo.TableName)
return m.asyncExecDDL(ctx, ddl)
>>>>>>> 1e3766ea7d (sink, ddl(ticdc): support add index ddl in downstream (#11476)):cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
}

if err := m.execDDLWithMaxRetries(ctx, ddl); err != nil {
Expand All @@ -153,7 +130,7 @@ func (m *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
return nil
}

func (m *mysqlDDLSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error {
func (m *DDLSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error {
return retry.Do(ctx, func() error {
err := m.statistics.RecordDDLExecution(func() error { return m.execDDL(ctx, ddl) })
if err != nil {
Expand Down Expand Up @@ -181,7 +158,7 @@ func (m *mysqlDDLSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDL
retry.WithIsRetryableErr(errorutil.IsRetryableDDLError))
}

func (m *mysqlDDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error {
writeTimeout, _ := time.ParseDuration(m.cfg.WriteTimeout)
writeTimeout += networkDriftDuration
ctx, cancelFunc := context.WithTimeout(pctx, writeTimeout)
Expand Down Expand Up @@ -251,74 +228,19 @@ func needSwitchDB(ddl *model.DDLEvent) bool {
return true
}

func (m *mysqlDDLSink) WriteCheckpointTs(_ context.Context, _ uint64, _ []*model.TableInfo) error {
func (m *DDLSink) WriteCheckpointTs(_ context.Context, _ uint64, _ []*model.TableInfo) error {
// Only for RowSink for now.
return nil
}

// Close closes the database connection.
func (m *mysqlDDLSink) Close() error {
func (m *DDLSink) Close() error {
if err := m.db.Close(); err != nil {
return cerrors.Trace(err)
return errors.Trace(err)
}
if m.statistics != nil {
m.statistics.Close()
}

return nil
}
<<<<<<< HEAD:cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink.go

// asyncExecAddIndexDDLIfTimeout executes ddl in async mode.
// this function only works in TiDB, because TiDB will save ddl jobs
// and execute them asynchronously even if ticdc crashed.
func (m *mysqlDDLSink) asyncExecAddIndexDDLIfTimeout(ctx context.Context, ddl *model.DDLEvent) error {
done := make(chan error, 1)
// wait for 2 seconds at most
tick := time.NewTimer(2 * time.Second)
defer tick.Stop()
log.Info("async exec add index ddl start",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
go func() {
if err := m.execDDLWithMaxRetries(ctx, ddl); err != nil {
log.Error("async exec add index ddl failed",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
done <- err
return
}
log.Info("async exec add index ddl done",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
done <- nil
}()

select {
case <-ctx.Done():
// if the ddl is canceled, we just return nil, if the ddl is not received by tidb,
// the downstream ddl is lost, because the checkpoint ts is forwarded.
log.Info("async add index ddl exits as canceled",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
return nil
case err := <-done:
// if the ddl is executed within 2 seconds, we just return the result to the caller.
return err
case <-tick.C:
// if the ddl is still running, we just return nil,
// then if the ddl is failed, the downstream ddl is lost.
// because the checkpoint ts is forwarded.
log.Info("async add index ddl is still running",
zap.String("changefeedID", m.id.String()),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("ddl", ddl.Query))
return nil
}
}
=======
>>>>>>> 1e3766ea7d (sink, ddl(ticdc): support add index ddl in downstream (#11476)):cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
63 changes: 1 addition & 62 deletions cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestWriteDDLEvent(t *testing.T) {
sinkURI, err := url.Parse("mysql://127.0.0.1:4000")
require.Nil(t, err)
rc := config.GetDefaultReplicaConfig()
sink, err := NewMySQLDDLSink(ctx, sinkURI, rc)
sink, err := NewDDLSink(ctx, sinkURI, rc)

require.Nil(t, err)

Expand Down Expand Up @@ -145,64 +145,3 @@ func TestNeedSwitchDB(t *testing.T) {
require.Equal(t, tc.needSwitch, needSwitchDB(tc.ddl))
}
}
<<<<<<< HEAD:cdc/sinkv2/ddlsink/mysql/mysql_ddl_sink_test.go

func TestAsyncExecAddIndex(t *testing.T) {
ddlExecutionTime := time.Millisecond * 3000
var dbIndex int32 = 0
GetDBConnImpl = func(ctx context.Context, dsnStr string) (*sql.DB, error) {
defer func() {
atomic.AddInt32(&dbIndex, 1)
}()
if atomic.LoadInt32(&dbIndex) == 0 {
// test db
db, err := pmysql.MockTestDB(true)
require.Nil(t, err)
return db, nil
}
// normal db
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectQuery("select tidb_version()").
WillReturnRows(sqlmock.NewRows([]string{"tidb_version()"}).AddRow("5.7.25-TiDB-v4.0.0-beta-191-ga1b3e3b"))
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("Create index idx1 on test.t1(a)").
WillDelayFor(ddlExecutionTime).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
mock.ExpectClose()
return db, nil
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sinkURI, err := url.Parse("mysql://127.0.0.1:4000")
require.Nil(t, err)
rc := config.GetDefaultReplicaConfig()
sink, err := NewMySQLDDLSink(ctx, sinkURI, rc)

require.Nil(t, err)

ddl1 := &model.DDLEvent{
StartTs: 1000,
CommitTs: 1010,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "test",
Table: "t1",
},
},
Type: timodel.ActionAddIndex,
Query: "Create index idx1 on test.t1(a)",
}
start := time.Now()
err = sink.WriteDDLEvent(ctx, ddl1)
require.Nil(t, err)
require.True(t, time.Since(start) < ddlExecutionTime)
require.True(t, time.Since(start) >= 2*time.Second)
sink.Close()
}
=======
>>>>>>> 1e3766ea7d (sink, ddl(ticdc): support add index ddl in downstream (#11476)):cdc/sink/ddlsink/mysql/mysql_ddl_sink_test.go
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.0
github.com/hashicorp/golang-lru v0.5.1
github.com/integralist/go-findroot v0.0.0-20160518114804-ac90681525dc
github.com/jarcoal/httpmock v1.2.0
github.com/jmoiron/sqlx v1.3.3
Expand All @@ -66,6 +67,7 @@ require (
github.com/pingcap/tidb v1.1.0-beta.0.20240604073126-607c846f4ca9
github.com/pingcap/tidb-tools v7.0.1-0.20231228094724-d6c7fc83380a+incompatible
github.com/pingcap/tidb/parser v0.0.0-20240604073126-607c846f4ca9
github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67
github.com/prometheus/client_golang v1.13.0
github.com/prometheus/client_model v0.2.0
github.com/r3labs/diff v1.1.0
Expand Down Expand Up @@ -233,7 +235,6 @@ require (
github.com/pingcap/badger v1.5.1-0.20220314162537-ab58fbf40580 // indirect
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 // indirect
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67 // indirect
github.com/pingcap/tipb v0.0.0-20221123081521-2fb828910813 // indirect
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
Expand Down

0 comments on commit f26d336

Please sign in to comment.