Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed(ticdc): send bootstrap message asynchronously to prevent block other changefeeds (#11573) #11588

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
if err != nil {
return errors.Trace(err)
}
// bootstrap not finished yet, cannot send any event.
if !c.ddlManager.isBootstrapped() {
return nil
}

err = c.handleBarrier(ctx, barrier)
if err != nil {
Expand Down Expand Up @@ -687,7 +691,9 @@ LOOP2:
c.redoMetaMgr,
downstreamType,
util.GetOrZero(info.Config.BDRMode),
info.Config.Sink.ShouldSendAllBootstrapAtStart())
info.Config.Sink.ShouldSendAllBootstrapAtStart(),
ctx.Throw,
)

// create scheduler
cfg := *c.cfg
Expand Down
88 changes: 65 additions & 23 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type mockDDLSink struct {
syncPoint model.Ts
syncPointHis []model.Ts

bootstrapError bool

wg sync.WaitGroup
}

Expand Down Expand Up @@ -146,16 +148,15 @@ func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo
return m.mu.checkpointTs, m.mu.currentTables
}

func (m *mockDDLSink) close(ctx context.Context) error {
func (m *mockDDLSink) close(_ context.Context) error {
m.wg.Wait()
return nil
}

func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
func (m *mockDDLSink) emitBootstrap(_ context.Context, bootstrap *model.DDLEvent) error {
if m.bootstrapError {
return errors.New("emit bootstrap error")
}
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, bootstrap)
}
Expand Down Expand Up @@ -196,8 +197,24 @@ func (m *mockScheduler) DrainCapture(target model.CaptureID) (int, error) {
// Close closes the scheduler and releases resources.
func (m *mockScheduler) Close(ctx context.Context) {}

func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
) (
func newMockDDLSink(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
}
}

func newMockDDLSinkWithBootstrapError(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
bootstrapError: true,
}
}

func createChangefeed4Test(ctx cdcContext.Context,
newMockDDLSink func(model.ChangeFeedID, *model.ChangeFeedInfo, func(error), func(error)) DDLSink,
t *testing.T) (
*changefeed, map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester,
) {
up := upstream.NewUpstream4Test(&gc.MockPDClient{
Expand Down Expand Up @@ -228,12 +245,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}, nil
},
// new ddl ddlSink
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
}
},
newMockDDLSink,
// new scheduler
func(
ctx cdcContext.Context, up *upstream.Upstream, epoch uint64,
Expand Down Expand Up @@ -263,7 +275,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,

func TestPreCheck(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
cf.Tick(ctx, captures)
tester.MustApplyPatches()
require.NotNil(t, cf.state.Status)
Expand All @@ -283,7 +295,7 @@ func TestPreCheck(t *testing.T) {

func TestInitialize(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
defer cf.Close(ctx)
// pre check
cf.Tick(ctx, captures)
Expand All @@ -298,7 +310,7 @@ func TestInitialize(t *testing.T) {

func TestChangefeedHandleError(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
defer cf.Close(ctx)
// pre check
cf.Tick(ctx, captures)
Expand All @@ -316,6 +328,36 @@ func TestChangefeedHandleError(t *testing.T) {
require.Equal(t, cf.state.Info.Error.Message, "fake error")
}

func TestTrySendBootstrapMeetError(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
_ = helper.DDL2Event("create table test.t(id int primary key, b int)")

ctx := cdcContext.NewContext4Test(context.Background(), true)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSinkWithBootstrapError, t)
cf.upstream.KVStorage = helper.Storage()
defer cf.Close(ctx)

// pre check
cf.Tick(ctx, captures)
tester.MustApplyPatches()

// initialize
cf.state.Info.Config.Sink.Protocol = util.AddressOf("simple")
cf.state.Info.Config.Sink.SendAllBootstrapAtStart = util.AddressOf(true)
cf.Tick(ctx, captures)
tester.MustApplyPatches()

require.Eventually(t, func() bool {
cf.Tick(ctx, captures)
tester.MustApplyPatches()
if cf.state.Info.Error != nil {
return cf.state.Info.State == model.StatePending
}
return false
}, 5*time.Second, 100*time.Millisecond)
}

func TestExecDDL(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand All @@ -328,7 +370,7 @@ func TestExecDDL(t *testing.T) {
ctx := cdcContext.NewContext4Test(context.Background(), true)
ctx.ChangefeedVars().Info.StartTs = startTs

cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
cf.upstream.KVStorage = helper.Storage()
defer cf.Close(ctx)
tickThreeTime := func() {
Expand Down Expand Up @@ -410,7 +452,7 @@ func TestEmitCheckpointTs(t *testing.T) {
ctx := cdcContext.NewContext4Test(context.Background(), true)
ctx.ChangefeedVars().Info.StartTs = startTs

cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
cf.upstream.KVStorage = helper.Storage()

defer cf.Close(ctx)
Expand Down Expand Up @@ -470,7 +512,7 @@ func TestSyncPoint(t *testing.T) {
ctx.ChangefeedVars().Info.Config.SyncPointInterval = util.AddressOf(1 * time.Second)
// SyncPoint option is only available for MySQL compatible database.
ctx.ChangefeedVars().Info.SinkURI = "mysql://"
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -500,7 +542,7 @@ func TestSyncPoint(t *testing.T) {
func TestFinished(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx.ChangefeedVars().Info.TargetTs = ctx.ChangefeedVars().Info.StartTs + 1000
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
defer cf.Close(ctx)

// pre check
Expand Down Expand Up @@ -573,7 +615,7 @@ func testChangefeedReleaseResource(
redoLogDir string,
expectedInitialized bool,
) {
cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)

// pre check
cf.Tick(ctx, captures)
Expand Down Expand Up @@ -615,7 +657,7 @@ func TestBarrierAdvance(t *testing.T) {
}
ctx.ChangefeedVars().Info.SinkURI = "mysql://"

cf, captures, tester := createChangefeed4Test(ctx, t)
cf, captures, tester := createChangefeed4Test(ctx, newMockDDLSink, t)
defer cf.Close(ctx)

// The changefeed load the info from etcd.
Expand Down
117 changes: 76 additions & 41 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"math/rand"
"sort"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -134,8 +135,24 @@ type ddlManager struct {
sinkType model.DownstreamType
ddlResolvedTs model.Ts

shouldSendAllBootstrapAtStart bool
bootstraped bool
bootstrapState bootstrapState
reportError func(err error)
}

type bootstrapState int32

const (
bootstrapNotStarted bootstrapState = iota
bootstrapInProgress
bootstrapFinished
)

func storeBootstrapState(addr *bootstrapState, state bootstrapState) {
atomic.StoreInt32((*int32)(addr), int32(state))
}

func loadBootstrapState(addr *bootstrapState) bootstrapState {
return bootstrapState(atomic.LoadInt32((*int32)(addr)))
}

func newDDLManager(
Expand All @@ -151,6 +168,7 @@ func newDDLManager(
sinkType model.DownstreamType,
bdrMode bool,
shouldSendAllBootstrapAtStart bool,
reportError func(err error),
) *ddlManager {
log.Info("create ddl manager",
zap.String("namaspace", changefeedID.Namespace),
Expand All @@ -160,6 +178,11 @@ func newDDLManager(
zap.Bool("bdrMode", bdrMode),
zap.Stringer("sinkType", sinkType))

bootstrap := bootstrapFinished
if shouldSendAllBootstrapAtStart {
bootstrap = bootstrapNotStarted
}

return &ddlManager{
changfeedID: changefeedID,
ddlSink: ddlSink,
Expand All @@ -173,47 +196,61 @@ func newDDLManager(
ddlResolvedTs: startTs,
BDRMode: bdrMode,
// use the passed sinkType after we support get resolvedTs from sink
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
shouldSendAllBootstrapAtStart: shouldSendAllBootstrapAtStart,
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
bootstrapState: bootstrap,
reportError: reportError,
}
}

func (m *ddlManager) checkAndSendBootstrapMsgs(ctx context.Context) (bool, error) {
if !m.shouldSendAllBootstrapAtStart || m.bootstraped {
return true, nil
}
func (m *ddlManager) isBootstrapped() bool {
return loadBootstrapState(&m.bootstrapState) == bootstrapFinished
}

// return true if bootstrapped
func (m *ddlManager) trySendBootstrap(ctx context.Context, currentTables []*model.TableInfo) bool {
bootstrap := loadBootstrapState(&m.bootstrapState)
switch bootstrap {
case bootstrapFinished:
return true
case bootstrapInProgress:
return false
case bootstrapNotStarted:
}
storeBootstrapState(&m.bootstrapState, bootstrapInProgress)
start := time.Now()
defer func() {
go func() {
log.Info("start to send bootstrap messages",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(currentTables)))
for idx, table := range currentTables {
if table.TableInfo.IsView() {
continue
}
ddlEvent := &model.DDLEvent{
TableInfo: table,
IsBootstrap: true,
}
err := m.ddlSink.emitBootstrap(ctx, ddlEvent)
if err != nil {
log.Error("send bootstrap message failed",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(currentTables)),
zap.Int("emitted", idx+1),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
m.reportError(err)
return
}
}
storeBootstrapState(&m.bootstrapState, bootstrapFinished)
log.Info("send bootstrap messages finished",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(currentTables)),
zap.Duration("cost", time.Since(start)))
}()
// Send bootstrap messages to downstream.
tableInfo, err := m.allTables(ctx)
if err != nil {
return false, errors.Trace(err)
}
log.Info("start to send bootstrap messages",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(tableInfo)))

for _, table := range tableInfo {
if table.TableInfo.IsView() {
continue
}
ddlEvent := &model.DDLEvent{
TableInfo: table,
IsBootstrap: true,
}
err := m.ddlSink.emitBootstrap(ctx, ddlEvent)
if err != nil {
return false, errors.Trace(err)
}
}
m.bootstraped = true
return true, nil
return m.isBootstrapped()
}

// tick the ddlHandler, it does the following things:
Expand All @@ -233,19 +270,17 @@ func (m *ddlManager) tick(
m.justSentDDL = nil
m.updateCheckpointTs(checkpointTs, tableCheckpoint)

ok, err := m.checkAndSendBootstrapMsgs(ctx)
currentTables, err := m.allTables(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
}

// before bootstrap finished, cannot send any event.
ok := m.trySendBootstrap(ctx, currentTables)
if !ok {
return nil, nil, nil
}

currentTables, err := m.allTables(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
}

if m.executingDDL == nil {
m.ddlSink.emitCheckpointTs(m.checkpointTs, currentTables)
}
Expand Down
Loading
Loading