Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo(ticdc): fix redo initialization block the owner (#9887) #9992

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 11 additions & 24 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
default:
}

if c.redoMetaMgr.Enabled() {
if !c.redoMetaMgr.Running() {
return nil
}
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
Expand Down Expand Up @@ -578,32 +584,21 @@ LOOP2:
}
c.observerLastTick = atomic.NewTime(time.Time{})

stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
c.redoDDLMgr, err = redo.NewDDLManager(stdCtx, c.state.Info.Config.Consistent, ddlStartTs)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
c.redoDDLMgr = redo.NewDDLManager(c.id, c.state.Info.Config.Consistent, ddlStartTs)
if c.redoDDLMgr.Enabled() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ctx.Throw(c.redoDDLMgr.Run(stdCtx))
ctx.Throw(c.redoDDLMgr.Run(cancelCtx))
}()
}

c.redoMetaMgr, err = redo.NewMetaManagerWithInit(stdCtx,
c.state.Info.Config.Consistent, checkpointTs)
if err != nil {
return err
}
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, checkpointTs)
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ctx.Throw(c.redoMetaMgr.Run(stdCtx))
ctx.Throw(c.redoMetaMgr.Run(cancelCtx))
}()
}
log.Info("owner creates redo manager",
Expand Down Expand Up @@ -765,15 +760,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
}
// when removing a paused changefeed, the redo manager is nil, create a new one
if c.redoMetaMgr == nil {
redoMetaMgr, err := redo.NewMetaManager(ctx, c.state.Info.Config.Consistent)
if err != nil {
log.Info("owner creates redo manager for clean fail",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
return
}
c.redoMetaMgr = redoMetaMgr
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, 0)
}
err := c.redoMetaMgr.Cleanup(ctx)
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,10 +670,7 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
}
p.changefeed.Info.Config.Sink.TiDBSourceID = sourceID

p.redo.r, err = redo.NewDMLManager(stdCtx, p.changefeed.Info.Config.Consistent)
if err != nil {
return err
}
p.redo.r = redo.NewDMLManager(p.changefeedID, p.changefeed.Info.Config.Consistent)
p.redo.name = "RedoManager"
p.redo.spawn(stdCtx)

Expand Down
9 changes: 3 additions & 6 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -64,20 +63,18 @@ func newProcessor4Test(
return nil
}

stdCtx := contextutil.PutChangefeedIDInCtx(ctx, changefeedID)
if !enableRedo {
p.redo.r = redo.NewDisabledDMLManager()
} else {
tmpDir := t.TempDir()
redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID)
dmlMgr, err := redo.NewDMLManager(ctx, &config.ConsistentConfig{
dmlMgr := redo.NewDMLManager(changefeedID, &config.ConsistentConfig{
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
})
require.NoError(t, err)
p.redo.r = dmlMgr
}
p.redo.name = "RedoManager"
Expand All @@ -87,9 +84,9 @@ func newProcessor4Test(
p.sinkManager.r, p.sourceManager.r, _ = sinkmanager.NewManagerWithMemEngine(
t, changefeedID, state.Info, p.redo.r)
p.sinkManager.name = "SinkManager"
p.sinkManager.spawn(stdCtx)
p.sinkManager.spawn(ctx)
p.sourceManager.name = "SourceManager"
p.sourceManager.spawn(stdCtx)
p.sourceManager.spawn(ctx)

// NOTICE: we have to bind the sourceManager to the sinkManager
// otherwise the sinkManager will not receive the resolvedTs.
Expand Down
98 changes: 50 additions & 48 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo/common"
Expand Down Expand Up @@ -66,19 +65,17 @@ func NewDisabledDDLManager() *ddlManager {

// NewDDLManager creates a new ddl Manager.
func NewDDLManager(
ctx context.Context, cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) (*ddlManager, error) {
logManager, err := newLogManager(ctx, cfg, redo.RedoDDLLogFileType)
if err != nil {
return nil, err
}
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) *ddlManager {
m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType)
span := spanz.TableIDToComparableSpan(0)
logManager.AddTable(span, ddlStartTs)
m.AddTable(span, ddlStartTs)
return &ddlManager{
logManager: logManager,
// The current fakeSpan is meaningless, find a meaningful sapn in the future.
logManager: m,
// The current fakeSpan is meaningless, find a meaningful span in the future.
fakeSpan: span,
}, nil
}
}

type ddlManager struct {
Expand Down Expand Up @@ -115,12 +112,12 @@ type DMLManager interface {
}

// NewDMLManager creates a new dml Manager.
func NewDMLManager(ctx context.Context, cfg *config.ConsistentConfig) (*dmlManager, error) {
logManager, err := newLogManager(ctx, cfg, redo.RedoRowLogFileType)
if err != nil {
return nil, err
func NewDMLManager(
changefeedID model.ChangeFeedID, cfg *config.ConsistentConfig,
) *dmlManager {
return &dmlManager{
logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType),
}
return &dmlManager{logManager: logManager}, nil
}

// NewDisabledDMLManager creates a disabled dml Manager.
Expand Down Expand Up @@ -226,28 +223,22 @@ type logManager struct {
}

func newLogManager(
ctx context.Context, cfg *config.ConsistentConfig, logType string,
) (*logManager, error) {
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, logType string,
) *logManager {
// return a disabled Manager if no consistent config or normal consistent level
if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) {
return &logManager{enabled: false}, nil
return &logManager{enabled: false}
}

uri, err := storage.ParseRawURL(cfg.Storage)
if err != nil {
return nil, err
}
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
m := &logManager{
return &logManager{
enabled: true,
cfg: &writer.LogWriterConfig{
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: contextutil.CaptureAddrFromCtx(ctx),
ChangeFeedID: changefeedID,
URI: *uri,
UseExternalStorage: redo.IsExternalStorage(uri.Scheme),
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: config.GetGlobalServerConfig().AdvertiseAddr,
ChangeFeedID: changefeedID,
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
},
logBuffer: chann.NewAutoDrainChann[cacheEvents](),
rtsMap: spanz.SyncMap{},
Expand All @@ -260,21 +251,30 @@ func newLogManager(
metricRedoWorkerBusyRatio: common.RedoWorkerBusyRatio.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
}

m.writer, err = factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
return nil, err
}
return m, nil
}

// Run implements pkg/util.Runnable.
func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error {
if m.Enabled() {
defer m.close()
return m.bgUpdateLog(ctx)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
failpoint.Return(errors.New("changefeed new redo manager injected error"))
})
if !m.Enabled() {
return nil
}
return nil

defer m.close()
start := time.Now()
w, err := factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
log.Error("redo: failed to create redo log writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
return err
}
m.writer = w
return m.bgUpdateLog(ctx)
}

// WaitForReady implements pkg/util.Runnable.
Expand Down Expand Up @@ -546,11 +546,13 @@ func (m *logManager) close() {
atomic.StoreInt32(&m.closed, 1)

m.logBuffer.CloseAndDrain()
if err := m.writer.Close(); err != nil {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
if m.writer != nil {
if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
}
}
log.Info("redo manager closed",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
Expand Down
Loading
Loading