Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simple (ticdc): support send all tables bootstrap message at changefeed start (#11239) #11439

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,10 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
if c.Sink.SendBootstrapToAllPartition != nil {
res.Sink.SendBootstrapToAllPartition = util.AddressOf(*c.Sink.SendBootstrapToAllPartition)
}

if c.Sink.SendAllBootstrapAtStart != nil {
res.Sink.SendAllBootstrapAtStart = util.AddressOf(*c.Sink.SendAllBootstrapAtStart)
}
}
if c.Mounter != nil {
res.Mounter = &config.MounterConfig{
Expand Down Expand Up @@ -786,6 +790,10 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
res.Sink.SendBootstrapToAllPartition = util.AddressOf(*cloned.Sink.SendBootstrapToAllPartition)
}

if cloned.Sink.SendAllBootstrapAtStart != nil {
res.Sink.SendAllBootstrapAtStart = util.AddressOf(*cloned.Sink.SendAllBootstrapAtStart)
}

if cloned.Sink.DebeziumDisableSchema != nil {
res.Sink.DebeziumDisableSchema = util.AddressOf(*cloned.Sink.DebeziumDisableSchema)
}
Expand Down Expand Up @@ -951,6 +959,7 @@ type SinkConfig struct {
SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"`
SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"`
SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"`
SendAllBootstrapAtStart *bool `json:"send-all-bootstrap-at-start,omitempty"`
DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"`
DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"`
OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var defaultAPIConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocolConfig: &OpenProtocolConfig{OutputOldValue: true},
DebeziumConfig: &DebeziumConfig{OutputOldValue: true},
Expand Down
5 changes: 5 additions & 0 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,8 @@
s.domain.Close()
s.storage.Close() //nolint:errcheck
}

// SchemaStorage returns the schema storage
func (s *SchemaTestHelper) SchemaStorage() SchemaStorage {
return s.schemaStorage

Check warning on line 303 in cdc/entry/schema_test_helper.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/schema_test_helper.go#L302-L303

Added lines #L302 - L303 were not covered by tests
}
4 changes: 3 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,9 @@ LOOP2:
c.schema,
c.redoDDLMgr,
c.redoMetaMgr,
util.GetOrZero(cfInfo.Config.BDRMode))
util.GetOrZero(cfInfo.Config.BDRMode),
cfInfo.Config.Sink.ShouldSendAllBootstrapAtStart(),
)

// create scheduler
cfg := *c.cfg
Expand Down
11 changes: 9 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type mockDDLSink struct {
// whether to record the DDL history, only for rename table
recordDDLHistory bool
// a slice of DDL history, only for rename table
ddlHistory []string
ddlHistory []*model.DDLEvent
mu struct {
sync.Mutex
checkpointTs model.Ts
Expand Down Expand Up @@ -117,7 +117,7 @@ func (m *mockDDLSink) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo
}
}()
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, ddl.Query)
m.ddlHistory = append(m.ddlHistory, ddl)
} else {
m.ddlHistory = nil
}
Expand Down Expand Up @@ -155,6 +155,13 @@ func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, bootstrap)
}
return nil
}

type mockScheduler struct {
currentTables []model.TableID
}
Expand Down
76 changes: 62 additions & 14 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@

BDRMode bool
ddlResolvedTs model.Ts

shouldSendAllBootstrapAtStart bool
bootstraped bool
}

func newDDLManager(
Expand All @@ -143,6 +146,7 @@
redoManager redo.DDLManager,
redoMetaManager redo.MetaManager,
bdrMode bool,
shouldSendAllBootstrapAtStart bool,
) *ddlManager {
log.Info("owner create ddl manager",
zap.String("namespace", changefeedID.Namespace),
Expand All @@ -152,19 +156,56 @@
zap.Bool("bdrMode", bdrMode))

return &ddlManager{
changfeedID: changefeedID,
ddlSink: ddlSink,
filter: filter,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
redoMetaManager: redoMetaManager,
startTs: startTs,
checkpointTs: checkpointTs,
ddlResolvedTs: startTs,
BDRMode: bdrMode,
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
changfeedID: changefeedID,
ddlSink: ddlSink,
filter: filter,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
redoMetaManager: redoMetaManager,
startTs: startTs,
checkpointTs: checkpointTs,
ddlResolvedTs: startTs,
BDRMode: bdrMode,
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
shouldSendAllBootstrapAtStart: shouldSendAllBootstrapAtStart,
}
}

func (m *ddlManager) checkAndSendBootstrapMsgs(ctx context.Context) (bool, error) {
if !m.shouldSendAllBootstrapAtStart || m.bootstraped {
return true, nil
}
start := time.Now()
defer func() {
log.Info("send bootstrap messages finished",
zap.Stringer("changefeed", m.changfeedID),
zap.Duration("cost", time.Since(start)))
}()
// Send bootstrap messages to downstream.
tableInfo, err := m.allTables(ctx)
if err != nil {
return false, errors.Trace(err)
}

Check warning on line 189 in cdc/owner/ddl_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_manager.go#L188-L189

Added lines #L188 - L189 were not covered by tests
log.Info("start to send bootstrap messages",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(tableInfo)))

for _, table := range tableInfo {
if table.TableInfo.IsView() {
continue

Check warning on line 196 in cdc/owner/ddl_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_manager.go#L196

Added line #L196 was not covered by tests
}
ddlEvent := &model.DDLEvent{
TableInfo: table,
IsBootstrap: true,
}
err := m.ddlSink.emitBootstrap(ctx, ddlEvent)
if err != nil {
return false, errors.Trace(err)
}

Check warning on line 205 in cdc/owner/ddl_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_manager.go#L204-L205

Added lines #L204 - L205 were not covered by tests
}
m.bootstraped = true
return true, nil
}

// tick the ddlHandler, it does the following things:
Expand All @@ -183,6 +224,14 @@
m.justSentDDL = nil
m.checkpointTs = checkpointTs

ok, err := m.checkAndSendBootstrapMsgs(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
}

Check warning on line 230 in cdc/owner/ddl_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_manager.go#L229-L230

Added lines #L229 - L230 were not covered by tests
if !ok {
return nil, nil, nil
}

Check warning on line 233 in cdc/owner/ddl_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_manager.go#L232-L233

Added lines #L232 - L233 were not covered by tests

currentTables, err := m.allTables(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
Expand Down Expand Up @@ -483,8 +532,7 @@
return barrier
}

// allTables returns all tables in the schema that
// less or equal than the checkpointTs.
// allTables returns all tables in the schema in current checkpointTs.
func (m *ddlManager) allTables(ctx context.Context) ([]*model.TableInfo, error) {
if m.tableInfoCache == nil {
ts := m.getSnapshotTs()
Expand Down
42 changes: 39 additions & 3 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func createDDLManagerForTest(t *testing.T) *ddlManager {
schema,
redo.NewDisabledDDLManager(),
redo.NewDisabledMetaManager(),
false)
false,
false,
)
return res
}

Expand Down Expand Up @@ -246,9 +248,9 @@ func TestExecRenameTablesDDL(t *testing.T) {
}
require.Len(t, mockDDLSink.ddlHistory, 2)
require.Equal(t, "RENAME TABLE `test1`.`tb1` TO `test2`.`tb10`",
mockDDLSink.ddlHistory[0])
mockDDLSink.ddlHistory[0].Query)
require.Equal(t, "RENAME TABLE `test2`.`tb2` TO `test1`.`tb20`",
mockDDLSink.ddlHistory[1])
mockDDLSink.ddlHistory[1].Query)

// mock all rename table statements have been done
mockDDLSink.resetDDLDone = false
Expand Down Expand Up @@ -459,3 +461,37 @@ func TestIsGlobalDDL(t *testing.T) {
require.Equal(t, c.ret, isGlobalDDL(c.ddl))
}
}

func TestCheckAndSendBootstrapMsgs(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
ddl1 := helper.DDL2Event("create table test.tb1(id int primary key)")
ddl2 := helper.DDL2Event("create table test.tb2(id int primary key)")

ctx := context.Background()
dm := createDDLManagerForTest(t)
dm.schema = helper.SchemaStorage()
dm.startTs, dm.checkpointTs = ddl2.CommitTs, ddl2.CommitTs

mockDDLSink := dm.ddlSink.(*mockDDLSink)
mockDDLSink.recordDDLHistory = true

// do not send all bootstrap messages
send, err := dm.checkAndSendBootstrapMsgs(ctx)
require.Nil(t, err)
require.True(t, send)
require.False(t, dm.bootstraped)
require.Equal(t, 0, len(mockDDLSink.ddlHistory))

// send all bootstrap messages -> tb1 and tb2
dm.shouldSendAllBootstrapAtStart = true
send, err = dm.checkAndSendBootstrapMsgs(ctx)
require.Nil(t, err)
require.True(t, send)
require.True(t, dm.bootstraped)
require.Equal(t, 2, len(mockDDLSink.ddlHistory))
require.True(t, mockDDLSink.ddlHistory[0].IsBootstrap)
require.True(t, mockDDLSink.ddlHistory[1].IsBootstrap)
require.Equal(t, ddl1.TableInfo.TableName, mockDDLSink.ddlHistory[0].TableInfo.TableName)
require.Equal(t, ddl2.TableInfo.TableName, mockDDLSink.ddlHistory[1].TableInfo.TableName)
}
14 changes: 10 additions & 4 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
// the caller of this function can call again and again until a true returned
emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error)
emitSyncPoint(ctx context.Context, checkpointTs uint64) error
// emitBootstrap emits the table bootstrap event in a blocking way.
// It will return after the bootstrap event is sent.
emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error
// close the ddlsink, cancel running goroutine.
close(ctx context.Context) error
}
Expand Down Expand Up @@ -121,10 +124,6 @@
return errors.Trace(err)
}
a.sink = s

if !util.GetOrZero(a.info.Config.EnableSyncPoint) {
return nil
}
return nil
}

Expand Down Expand Up @@ -472,3 +471,10 @@

return result, nil
}

func (s *ddlSinkImpl) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
if err := s.makeSinkReady(ctx); err != nil {
return errors.Trace(err)
}
return s.sink.WriteDDLEvent(ctx, bootstrap)

Check warning on line 479 in cdc/owner/ddl_sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_sink.go#L475-L479

Added lines #L475 - L479 were not covered by tests
}
2 changes: 2 additions & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
Debezium: &config.DebeziumConfig{OutputOldValue: true},
Expand Down Expand Up @@ -253,6 +254,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
Debezium: &config.DebeziumConfig{OutputOldValue: true},
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down Expand Up @@ -337,6 +338,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down Expand Up @@ -511,6 +513,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var defaultReplicaConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(DefaultSendBootstrapIntervalInSec),
SendBootstrapInMsgCount: util.AddressOf(DefaultSendBootstrapInMsgCount),
SendBootstrapToAllPartition: util.AddressOf(DefaultSendBootstrapToAllPartition),
SendAllBootstrapAtStart: util.AddressOf(DefaultSendAllBootstrapAtStart),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &OpenProtocolConfig{OutputOldValue: true},
Debezium: &DebeziumConfig{OutputOldValue: true},
Expand Down
16 changes: 15 additions & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@
// DefaultSendBootstrapToAllPartition is the default value of
// whether to send bootstrap message to all partitions.
DefaultSendBootstrapToAllPartition = true
// DefaultSendAllBootstrapAtStart is the default value of whether
// to send all tables bootstrap message at changefeed start.
DefaultSendAllBootstrapAtStart = false

// DefaultMaxReconnectToPulsarBroker is the default max reconnect times to pulsar broker.
// The pulsar client uses an exponential backoff with jitter to reconnect to the broker.
Expand Down Expand Up @@ -190,7 +193,8 @@
// If set to false, bootstrap message will only be sent to the first partition of each topic.
// Default value is true.
SendBootstrapToAllPartition *bool `toml:"send-bootstrap-to-all-partition" json:"send-bootstrap-to-all-partition,omitempty"`

// SendAllBootstrapAtStart determines whether to send all tables bootstrap message at changefeed start.
SendAllBootstrapAtStart *bool `toml:"send-all-bootstrap-at-start" json:"send-all-bootstrap-at-start,omitempty"`
// Debezium only. Whether schema should be excluded in the output.
DebeziumDisableSchema *bool `toml:"debezium-disable-schema" json:"debezium-disable-schema,omitempty"`

Expand Down Expand Up @@ -228,6 +232,16 @@
util.GetOrZero(s.SendBootstrapInMsgCount) > 0
}

// ShouldSendAllBootstrapAtStart returns whether the should send all bootstrap message at changefeed start.
func (s *SinkConfig) ShouldSendAllBootstrapAtStart() bool {
if s == nil {
return false
}

Check warning on line 239 in pkg/config/sink.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/sink.go#L238-L239

Added lines #L238 - L239 were not covered by tests
should := s.ShouldSendBootstrapMsg() && util.GetOrZero(s.SendAllBootstrapAtStart)
log.Info("should send all bootstrap at start", zap.Bool("should", should))
return should
}

// CSVConfig defines a series of configuration items for csv codec.
type CSVConfig struct {
// delimiter between fields, it can be 1 character or at most 2 characters
Expand Down
Loading
Loading