Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simple (ticdc): support send all tables bootstrap message at changefeed start #11239

Merged
merged 10 commits into from
Jun 20, 2024
9 changes: 9 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
if c.Sink.SendBootstrapToAllPartition != nil {
res.Sink.SendBootstrapToAllPartition = util.AddressOf(*c.Sink.SendBootstrapToAllPartition)
}

if c.Sink.SendAllBootstrapAtStart != nil {
res.Sink.SendAllBootstrapAtStart = util.AddressOf(*c.Sink.SendAllBootstrapAtStart)
}
}
if c.Mounter != nil {
res.Mounter = &config.MounterConfig{
Expand Down Expand Up @@ -792,6 +796,10 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
res.Sink.SendBootstrapToAllPartition = util.AddressOf(*cloned.Sink.SendBootstrapToAllPartition)
}

if cloned.Sink.SendAllBootstrapAtStart != nil {
res.Sink.SendAllBootstrapAtStart = util.AddressOf(*cloned.Sink.SendAllBootstrapAtStart)
}

if cloned.Sink.DebeziumDisableSchema != nil {
res.Sink.DebeziumDisableSchema = util.AddressOf(*cloned.Sink.DebeziumDisableSchema)
}
Expand Down Expand Up @@ -957,6 +965,7 @@ type SinkConfig struct {
SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"`
SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"`
SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"`
SendAllBootstrapAtStart *bool `json:"send-all-bootstrap-at-start,omitempty"`
DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"`
DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"`
OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var defaultAPIConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocolConfig: &OpenProtocolConfig{OutputOldValue: true},
DebeziumConfig: &DebeziumConfig{OutputOldValue: true},
Expand Down
5 changes: 5 additions & 0 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,8 @@ func (s *SchemaTestHelper) Close() {
s.domain.Close()
s.storage.Close() //nolint:errcheck
}

// SchemaStorage returns the schema storage
func (s *SchemaTestHelper) SchemaStorage() SchemaStorage {
return s.schemaStorage
}
4 changes: 3 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,9 @@ LOOP2:
c.schema,
c.redoDDLMgr,
c.redoMetaMgr,
util.GetOrZero(cfInfo.Config.BDRMode))
util.GetOrZero(cfInfo.Config.BDRMode),
cfInfo.Config.Sink.ShouldSendAllBootstrapAtStart(),
)

// create scheduler
cfg := *c.cfg
Expand Down
11 changes: 9 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type mockDDLSink struct {
// whether to record the DDL history, only for rename table
recordDDLHistory bool
// a slice of DDL history, only for rename table
ddlHistory []string
ddlHistory []*model.DDLEvent
mu struct {
sync.Mutex
checkpointTs model.Ts
Expand Down Expand Up @@ -117,7 +117,7 @@ func (m *mockDDLSink) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo
}
}()
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, ddl.Query)
m.ddlHistory = append(m.ddlHistory, ddl)
} else {
m.ddlHistory = nil
}
Expand Down Expand Up @@ -155,6 +155,13 @@ func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, bootstrap)
}
return nil
}

type mockScheduler struct {
currentTables []model.TableID
}
Expand Down
76 changes: 62 additions & 14 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ type ddlManager struct {

BDRMode bool
ddlResolvedTs model.Ts

shouldSendAllBootstrapAtStart bool
bootstraped bool
}

func newDDLManager(
Expand All @@ -143,6 +146,7 @@ func newDDLManager(
redoManager redo.DDLManager,
redoMetaManager redo.MetaManager,
bdrMode bool,
shouldSendAllBootstrapAtStart bool,
) *ddlManager {
log.Info("owner create ddl manager",
zap.String("namespace", changefeedID.Namespace),
Expand All @@ -152,19 +156,56 @@ func newDDLManager(
zap.Bool("bdrMode", bdrMode))

return &ddlManager{
changfeedID: changefeedID,
ddlSink: ddlSink,
filter: filter,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
redoMetaManager: redoMetaManager,
startTs: startTs,
checkpointTs: checkpointTs,
ddlResolvedTs: startTs,
BDRMode: bdrMode,
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
changfeedID: changefeedID,
ddlSink: ddlSink,
filter: filter,
ddlPuller: ddlPuller,
schema: schema,
redoDDLManager: redoManager,
redoMetaManager: redoMetaManager,
startTs: startTs,
checkpointTs: checkpointTs,
ddlResolvedTs: startTs,
BDRMode: bdrMode,
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
shouldSendAllBootstrapAtStart: shouldSendAllBootstrapAtStart,
}
}

func (m *ddlManager) checkAndSendBootstarpMsgs(ctx context.Context) (bool, error) {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
if !m.shouldSendAllBootstrapAtStart || m.bootstraped {
return true, nil
}
start := time.Now()
defer func() {
log.Info("send bootstrap messages finished",
zap.Stringer("changefeed", m.changfeedID),
zap.Duration("cost", time.Since(start)))
}()
// Send bootstrap messages to downstream.
tableInfo, err := m.allTables(ctx)
if err != nil {
return false, errors.Trace(err)
}
log.Info("start to send bootstrap messages",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(tableInfo)))

for _, table := range tableInfo {
if table.TableInfo.IsView() {
continue
}
ddlEvent := &model.DDLEvent{
TableInfo: table,
IsBootstrap: true,
}
err := m.ddlSink.emitBootstrap(ctx, ddlEvent)
if err != nil {
return false, errors.Trace(err)
}
}
m.bootstraped = true
return true, nil
}

// tick the ddlHandler, it does the following things:
Expand All @@ -183,6 +224,14 @@ func (m *ddlManager) tick(
m.justSentDDL = nil
m.checkpointTs = checkpointTs

ok, err := m.checkAndSendBootstarpMsgs(ctx)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, errors.Trace(err)
}
if !ok {
return nil, nil, nil
}

currentTables, err := m.allTables(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
Expand Down Expand Up @@ -483,8 +532,7 @@ func (m *ddlManager) barrier() *schedulepb.BarrierWithMinTs {
return barrier
}

// allTables returns all tables in the schema that
// less or equal than the checkpointTs.
// allTables returns all tables in the schema in current checkpointTs.
func (m *ddlManager) allTables(ctx context.Context) ([]*model.TableInfo, error) {
if m.tableInfoCache == nil {
ts := m.getSnapshotTs()
Expand Down
51 changes: 48 additions & 3 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
schema,
redo.NewDisabledDDLManager(),
redo.NewDisabledMetaManager(),
false)
false,
false,
)
return res
}

Expand Down Expand Up @@ -246,9 +248,9 @@
}
require.Len(t, mockDDLSink.ddlHistory, 2)
require.Equal(t, "RENAME TABLE `test1`.`tb1` TO `test2`.`tb10`",
mockDDLSink.ddlHistory[0])
mockDDLSink.ddlHistory[0].Query)
require.Equal(t, "RENAME TABLE `test2`.`tb2` TO `test1`.`tb20`",
mockDDLSink.ddlHistory[1])
mockDDLSink.ddlHistory[1].Query)

// mock all rename table statements have been done
mockDDLSink.resetDDLDone = false
Expand Down Expand Up @@ -459,3 +461,46 @@
require.Equal(t, c.ret, isGlobalDDL(c.ddl))
}
}

func TestCheckAndSendBootstarpMsgs(t *testing.T) {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
ddl1 := helper.DDL2Event("create table test.tb1(id int primary key)")
ddl2 := helper.DDL2Event("create table test.tb2(id int primary key)")

ctx := context.Background()
dm := createDDLManagerForTest(t)
dm.schema = helper.SchemaStorage()
sendBootstrapIntervalInSec := int64(1)
sendBootstrapInMsgCount := int32(2)
protocol := "simple"
dm.sinkConfig.SendBootstrapIntervalInSec = &sendBootstrapIntervalInSec

Check failure on line 477 in cdc/owner/ddl_manager_test.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

dm.sinkConfig undefined (type *ddlManager has no field or method sinkConfig)
dm.sinkConfig.SendBootstrapInMsgCount = &sendBootstrapInMsgCount

Check failure on line 478 in cdc/owner/ddl_manager_test.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

dm.sinkConfig undefined (type *ddlManager has no field or method sinkConfig)
dm.sinkConfig.Protocol = &protocol

Check failure on line 479 in cdc/owner/ddl_manager_test.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

dm.sinkConfig undefined (type *ddlManager has no field or method sinkConfig)
dm.startTs, dm.checkpointTs = ddl2.CommitTs, ddl2.CommitTs

mockDDLSink := dm.ddlSink.(*mockDDLSink)
mockDDLSink.recordDDLHistory = true

// do not send all bootstrap messages
sendAllBootstrapAtStart := false
dm.sinkConfig.SendAllBootstrapAtStart = &sendAllBootstrapAtStart

Check failure on line 487 in cdc/owner/ddl_manager_test.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

dm.sinkConfig undefined (type *ddlManager has no field or method sinkConfig)
send, err := dm.checkAndSendBootstarpMsgs(ctx)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
require.Nil(t, err)
require.True(t, send)
require.False(t, dm.bootstraped)
require.Equal(t, 0, len(mockDDLSink.ddlHistory))

// send all bootstrap messages -> tb1 and tb2
sendAllBootstrapAtStart = true
dm.sinkConfig.SendAllBootstrapAtStart = &sendAllBootstrapAtStart

Check failure on line 496 in cdc/owner/ddl_manager_test.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

dm.sinkConfig undefined (type *ddlManager has no field or method sinkConfig) (typecheck)
send, err = dm.checkAndSendBootstarpMsgs(ctx)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
require.Nil(t, err)
require.True(t, send)
require.True(t, dm.bootstraped)
require.Equal(t, 2, len(mockDDLSink.ddlHistory))
require.True(t, mockDDLSink.ddlHistory[0].IsBootstrap)
require.True(t, mockDDLSink.ddlHistory[1].IsBootstrap)
require.Equal(t, ddl1.TableInfo.TableName, mockDDLSink.ddlHistory[0].TableInfo.TableName)
require.Equal(t, ddl2.TableInfo.TableName, mockDDLSink.ddlHistory[1].TableInfo.TableName)
}
12 changes: 8 additions & 4 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type DDLSink interface {
// the caller of this function can call again and again until a true returned
emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error)
emitSyncPoint(ctx context.Context, checkpointTs uint64) error
emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
// close the ddlsink, cancel running goroutine.
close(ctx context.Context) error
}
Expand Down Expand Up @@ -121,10 +122,6 @@ func ddlSinkInitializer(ctx context.Context, a *ddlSinkImpl) error {
return errors.Trace(err)
}
a.sink = s

if !util.GetOrZero(a.info.Config.EnableSyncPoint) {
return nil
}
return nil
}

Expand Down Expand Up @@ -472,3 +469,10 @@ func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) {

return result, nil
}

func (s *ddlSinkImpl) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
if err := s.makeSinkReady(ctx); err != nil {
return errors.Trace(err)
}
return s.sink.WriteDDLEvent(ctx, bootstrap)
}
2 changes: 2 additions & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
Debezium: &config.DebeziumConfig{OutputOldValue: true},
Expand Down Expand Up @@ -253,6 +254,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
Debezium: &config.DebeziumConfig{OutputOldValue: true},
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down Expand Up @@ -337,6 +338,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down Expand Up @@ -511,6 +513,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var defaultReplicaConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(DefaultSendBootstrapIntervalInSec),
SendBootstrapInMsgCount: util.AddressOf(DefaultSendBootstrapInMsgCount),
SendBootstrapToAllPartition: util.AddressOf(DefaultSendBootstrapToAllPartition),
SendAllBootstrapAtStart: util.AddressOf(DefaultSendAllBootstrapAtStart),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &OpenProtocolConfig{OutputOldValue: true},
Debezium: &DebeziumConfig{OutputOldValue: true},
Expand Down
16 changes: 15 additions & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ const (
// DefaultSendBootstrapToAllPartition is the default value of
// whether to send bootstrap message to all partitions.
DefaultSendBootstrapToAllPartition = true
// DefaultSendAllBootstrapAtStart is the default value of whether
// to send all tables bootstrap message at changefeed start.
DefaultSendAllBootstrapAtStart = false

// DefaultMaxReconnectToPulsarBroker is the default max reconnect times to pulsar broker.
// The pulsar client uses an exponential backoff with jitter to reconnect to the broker.
Expand Down Expand Up @@ -188,7 +191,8 @@ type SinkConfig struct {
// If set to false, bootstrap message will only be sent to the first partition of each topic.
// Default value is true.
SendBootstrapToAllPartition *bool `toml:"send-bootstrap-to-all-partition" json:"send-bootstrap-to-all-partition,omitempty"`

// SendAllBootstrapAtStart determines whether to send all tables bootstrap message at changefeed start.
SendAllBootstrapAtStart *bool `toml:"send-all-bootstrap-at-start" json:"send-all-bootstrap-at-start,omitempty"`
// Debezium only. Whether schema should be excluded in the output.
DebeziumDisableSchema *bool `toml:"debezium-disable-schema" json:"debezium-disable-schema,omitempty"`

Expand Down Expand Up @@ -227,6 +231,16 @@ func (s *SinkConfig) ShouldSendBootstrapMsg() bool {
util.GetOrZero(s.SendBootstrapInMsgCount) > 0
}

// ShouldSendAllBootstrapAtStart returns whether the should send all bootstrap message at changefeed start.
func (s *SinkConfig) ShouldSendAllBootstrapAtStart() bool {
if s == nil {
return false
}
should := s.ShouldSendBootstrapMsg() && util.GetOrZero(s.SendAllBootstrapAtStart)
log.Info("should send all bootstrap at start", zap.Bool("should", should))
return should
}

// CSVConfig defines a series of configuration items for csv codec.
type CSVConfig struct {
// delimiter between fields, it can be 1 character or at most 2 characters
Expand Down
Loading
Loading