Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3172
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
liuzix authored and ti-chi-bot committed Nov 4, 2021
1 parent 8db4d1c commit a241934
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 45 deletions.
24 changes: 16 additions & 8 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (s *schemaSnapshot) Tables() map[model.TableID]*model.TableInfo {
// SchemaStorage stores the schema information with multi-version
type SchemaStorage interface {
// GetSnapshot returns the snapshot which of ts is specified
GetSnapshot(ctx context.Context, ts uint64) (*schemaSnapshot, error)
GetSnapshot(ctx context.Context, ts uint64) (*SingleSchemaSnapshot, error)
// GetLastSnapshot returns the last snapshot
GetLastSnapshot() *schemaSnapshot
// HandleDDLJob creates a new snapshot in storage and handles the ddl job
Expand All @@ -661,8 +661,9 @@ type SchemaStorage interface {
AdvanceResolvedTs(ts uint64)
// ResolvedTs returns the resolved ts of the schema storage
ResolvedTs() uint64
// DoGC removes snaps which of ts less than this specified ts
DoGC(ts uint64)
// DoGC removes snaps that are no longer needed at the specified TS.
// It returns the TS from which the oldest maintained snapshot is valid.
DoGC(ts uint64) (lastSchemaTs uint64)
}

type schemaStorageImpl struct {
Expand Down Expand Up @@ -796,7 +797,7 @@ func (s *schemaStorageImpl) ResolvedTs() uint64 {
}

// DoGC removes snaps which of ts less than this specified ts
func (s *schemaStorageImpl) DoGC(ts uint64) {
func (s *schemaStorageImpl) DoGC(ts uint64) (lastSchemaTs uint64) {
s.snapsMu.Lock()
defer s.snapsMu.Unlock()
var startIdx int
Expand All @@ -807,17 +808,24 @@ func (s *schemaStorageImpl) DoGC(ts uint64) {
startIdx = i
}
if startIdx == 0 {
return
return s.snaps[0].currentTs
}
if log.GetLevel() == zapcore.DebugLevel {
log.Debug("Do GC in schema storage")
for i := 0; i < startIdx; i++ {
s.snaps[i].PrintStatus(log.Debug)
}
}
s.snaps = s.snaps[startIdx:]
atomic.StoreUint64(&s.gcTs, s.snaps[0].currentTs)
log.Info("finished gc in schema storage", zap.Uint64("gcTs", s.snaps[0].currentTs))

// copy the part of the slice that is needed instead of re-slicing it
// to maximize efficiency of Go runtime GC.
newSnaps := make([]*schemaSnapshot, len(s.snaps)-startIdx)
copy(newSnaps, s.snaps[startIdx:])
s.snaps = newSnaps

lastSchemaTs = s.snaps[0].currentTs
atomic.StoreUint64(&s.gcTs, lastSchemaTs)
return
}

// SkipJob skip the job should not be executed
Expand Down
8 changes: 6 additions & 2 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,9 @@ func (t *schemaSuite) TestMultiVersionStorage(c *check.C) {
_, exist = snap.TableByID(3)
c.Assert(exist, check.IsFalse)

storage.DoGC(0)
lastSchemaTs := storage.DoGC(0)
require.Equal(t, uint64(0), lastSchemaTs)

snap, err = storage.GetSnapshot(ctx, 100)
c.Assert(err, check.IsNil)
_, exist = snap.SchemaByID(1)
Expand All @@ -644,7 +646,9 @@ func (t *schemaSuite) TestMultiVersionStorage(c *check.C) {
_, exist = snap.TableByID(3)
c.Assert(exist, check.IsFalse)

storage.DoGC(155)
lastSchemaTs = storage.DoGC(155)
require.Equal(t, uint64(140), lastSchemaTs)

storage.AdvanceResolvedTs(185)

snap, err = storage.GetSnapshot(ctx, 180)
Expand Down
12 changes: 0 additions & 12 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
tablepipeline "github.com/pingcap/ticdc/cdc/processor/pipeline"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
Expand Down Expand Up @@ -62,17 +61,6 @@ func NewManager() *Manager {
}
}

// NewManager4Test creates a new processor manager for test
func NewManager4Test(
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *Manager {
m := NewManager()
m.newProcessor = func(ctx cdcContext.Context) *processor {
return newProcessor4Test(ctx, createTablePipeline)
}
return m
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// the Tick function of Manager create or remove processor instances according to the specified `state`, or pass the `state` to processor instances
Expand Down
14 changes: 13 additions & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,20 @@ type managerSuite struct {

var _ = check.Suite(&managerSuite{})

// NewManager4Test creates a new processor manager for test
func NewManager4Test(
c *check.C,
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *Manager {
m := NewManager()
m.newProcessor = func(ctx cdcContext.Context) *processor {
return newProcessor4Test(ctx, c, createTablePipeline)
}
return m
}

func (s *managerSuite) resetSuit(ctx cdcContext.Context, c *check.C) {
s.manager = NewManager4Test(func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) {
s.manager = NewManager4Test(c, func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
Expand Down
8 changes: 8 additions & 0 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ var (
Name: "exit_with_error_count",
Help: "counter for processor exits with error",
}, []string{"changefeed", "capture"})
processorSchemaStorageGcTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "schema_storage_gc_ts",
Help: "the TS of the currently maintained oldest snapshot in SchemaStorage",
}, []string{"changefeed", "capture"})
)

// InitMetrics registers all metrics used in processor
Expand All @@ -70,4 +77,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(checkpointTsLagGauge)
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(processorSchemaStorageGcTsGauge)
}
62 changes: 41 additions & 21 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ import (
)

const (
schemaStorageGCLag = time.Minute * 20

backoffBaseDelayInMs = 5
maxTries = 3
)
Expand All @@ -59,6 +57,8 @@ type processor struct {
tables map[model.TableID]tablepipeline.TablePipeline

schemaStorage entry.SchemaStorage
lastSchemaTs model.Ts

filter *filter.Filter
mounter entry.Mounter
sinkManager *sink.Manager
Expand All @@ -71,12 +71,13 @@ type processor struct {
lazyInit func(ctx cdcContext.Context) error
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error)

metricResolvedTsGauge prometheus.Gauge
metricResolvedTsLagGauge prometheus.Gauge
metricCheckpointTsGauge prometheus.Gauge
metricCheckpointTsLagGauge prometheus.Gauge
metricSyncTableNumGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
metricResolvedTsGauge prometheus.Gauge
metricResolvedTsLagGauge prometheus.Gauge
metricCheckpointTsGauge prometheus.Gauge
metricCheckpointTsLagGauge prometheus.Gauge
metricSyncTableNumGauge prometheus.Gauge
metricSchemaStorageGcTsGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
}

// newProcessor creates a new processor
Expand All @@ -90,18 +91,20 @@ func newProcessor(ctx cdcContext.Context) *processor {
captureInfo: ctx.GlobalVars().CaptureInfo,
cancel: func() {},

metricResolvedTsGauge: resolvedTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricResolvedTsLagGauge: resolvedTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsGauge: checkpointTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsLagGauge: checkpointTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricSyncTableNumGauge: syncTableNumGauge.WithLabelValues(changefeedID, advertiseAddr),
metricProcessorErrorCounter: processorErrorCounter.WithLabelValues(changefeedID, advertiseAddr),
metricResolvedTsGauge: resolvedTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricResolvedTsLagGauge: resolvedTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsGauge: checkpointTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsLagGauge: checkpointTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricSyncTableNumGauge: syncTableNumGauge.WithLabelValues(changefeedID, advertiseAddr),
metricProcessorErrorCounter: processorErrorCounter.WithLabelValues(changefeedID, advertiseAddr),
metricSchemaStorageGcTsGauge: processorSchemaStorageGcTsGauge.WithLabelValues(changefeedID, advertiseAddr),
}
p.createTablePipeline = p.createTablePipelineImpl
p.lazyInit = p.lazyInitImpl
return p
}

<<<<<<< HEAD
func newProcessor4Test(ctx cdcContext.Context,
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *processor {
Expand All @@ -111,6 +114,8 @@ func newProcessor4Test(ctx cdcContext.Context,
return p
}

=======
>>>>>>> fd39bb2e9 (schema_storage: fix schema GC threshold & improve memory management (#3172))
// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts, maintain table pipeline, error handling, etc.
Expand Down Expand Up @@ -179,7 +184,7 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS
p.handlePosition()
p.pushResolvedTs2Table()
p.handleWorkload()
p.doGCSchemaStorage()
p.doGCSchemaStorage(ctx)
return p.changefeed, nil
}

Expand Down Expand Up @@ -750,16 +755,30 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
}

// doGCSchemaStorage trigger the schema storage GC
func (p *processor) doGCSchemaStorage() {
func (p *processor) doGCSchemaStorage(ctx cdcContext.Context) {
if p.schemaStorage == nil {
// schemaStorage is nil only in test
return
}
// Delay GC to accommodate pullers starting from a startTs that's too small
// TODO fix startTs problem and remove GC delay, or use other mechanism that prevents the problem deterministically
gcTime := oracle.GetTimeFromTS(p.changefeed.Status.CheckpointTs).Add(-schemaStorageGCLag)
gcTs := oracle.ComposeTS(gcTime.Unix(), 0)
p.schemaStorage.DoGC(gcTs)

if p.changefeed.Status == nil {
// This could happen if Etcd data is not complete.
return
}

// Please refer to `unmarshalAndMountRowChanged` in cdc/entry/mounter.go
// for why we need -1.
lastSchemaTs := p.schemaStorage.DoGC(p.changefeed.Status.CheckpointTs - 1)
if p.lastSchemaTs == lastSchemaTs {
return
}
p.lastSchemaTs = lastSchemaTs

log.Debug("finished gc in schema storage",
zap.Uint64("gcTs", lastSchemaTs),
cdcContext.ZapFieldChangefeed(ctx))
lastSchemaPhysicalTs := oracle.ExtractPhysical(lastSchemaTs)
p.metricSchemaStorageGcTsGauge.Set(float64(lastSchemaPhysicalTs))
}

func (p *processor) Close() error {
Expand All @@ -779,6 +798,7 @@ func (p *processor) Close() error {
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
if p.sinkManager != nil {
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading

0 comments on commit a241934

Please sign in to comment.