Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema_storage: fix schema GC threshold & improve memory management #3172

Merged
merged 34 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2a41b00
schema_storage: make runtime GC snapshots in time
liuzix Oct 29, 2021
96d31ec
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Oct 29, 2021
28aa013
processor: remove schema GC lag
liuzix Oct 29, 2021
34b0512
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Oct 29, 2021
3a6fe19
processor: fix schema GC TS
liuzix Oct 29, 2021
db6eb2d
Merge branch 'zixiong-fix-schema-gc' of github.com:liuzix/ticdc into …
liuzix Oct 29, 2021
1898bf6
processor: fix schema GC TS
liuzix Oct 29, 2021
4fb7b0f
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Oct 29, 2021
c3d1ad5
add test
liuzix Nov 2, 2021
20fb58e
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Nov 2, 2021
4a60654
adjust code
liuzix Nov 2, 2021
3a57fd5
Merge branch 'zixiong-fix-schema-gc' of github.com:liuzix/ticdc into …
liuzix Nov 2, 2021
e2902d6
fix bug
liuzix Nov 2, 2021
848b1fa
fix ut
liuzix Nov 2, 2021
388f25c
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Nov 2, 2021
2448247
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Nov 2, 2021
465c218
add metrics for GC Ts
liuzix Nov 2, 2021
ad4d71d
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Nov 2, 2021
dd24ed9
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Nov 3, 2021
3c8898f
fix metrics
liuzix Nov 3, 2021
c588cc0
add Grafana panel
liuzix Nov 3, 2021
697a497
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Nov 3, 2021
0c6ebb5
fix Grafana panel
liuzix Nov 3, 2021
e462f4e
Merge branch 'zixiong-fix-schema-gc' of github.com:liuzix/ticdc into …
liuzix Nov 3, 2021
0092257
fix update processor lastSchemaTs
liuzix Nov 3, 2021
0d8458d
fix Grafana panel instant = true
liuzix Nov 3, 2021
5c10065
add metrics prefix
liuzix Nov 3, 2021
797bd05
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Nov 3, 2021
a7127e6
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Nov 3, 2021
e274e64
add nil check for changefeed status
liuzix Nov 3, 2021
d50d33a
Merge branch 'zixiong-fix-schema-gc' of github.com:liuzix/ticdc into …
liuzix Nov 3, 2021
16fe2f5
fix Grafana pluginVersion
liuzix Nov 4, 2021
082a66c
fix Grafana value name
liuzix Nov 4, 2021
3105724
Merge branch 'master' into zixiong-fix-schema-gc
liuzix Nov 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ func (s *schemaSnapshot) Tables() map[model.TableID]*model.TableInfo {
// SchemaStorage stores the schema information with multi-version
type SchemaStorage interface {
// GetSnapshot returns the snapshot which of ts is specified
GetSnapshot(ctx context.Context, ts uint64) (*schemaSnapshot, error)
GetSnapshot(ctx context.Context, ts uint64) (*SingleSchemaSnapshot, error)
// GetLastSnapshot returns the last snapshot
GetLastSnapshot() *schemaSnapshot
// HandleDDLJob creates a new snapshot in storage and handles the ddl job
Expand All @@ -662,8 +662,9 @@ type SchemaStorage interface {
AdvanceResolvedTs(ts uint64)
// ResolvedTs returns the resolved ts of the schema storage
ResolvedTs() uint64
// DoGC removes snaps which of ts less than this specified ts
DoGC(ts uint64)
// DoGC removes snaps that are no longer needed at the specified TS.
// It returns the TS from which the oldest maintained snapshot is valid.
DoGC(ts uint64) (lastSchemaTs uint64)
}

type schemaStorageImpl struct {
Expand Down Expand Up @@ -797,7 +798,7 @@ func (s *schemaStorageImpl) ResolvedTs() uint64 {
}

// DoGC removes snaps which of ts less than this specified ts
func (s *schemaStorageImpl) DoGC(ts uint64) {
func (s *schemaStorageImpl) DoGC(ts uint64) (lastSchemaTs uint64) {
s.snapsMu.Lock()
defer s.snapsMu.Unlock()
var startIdx int
Expand All @@ -808,17 +809,24 @@ func (s *schemaStorageImpl) DoGC(ts uint64) {
startIdx = i
}
if startIdx == 0 {
return
return s.snaps[0].currentTs
}
if log.GetLevel() == zapcore.DebugLevel {
log.Debug("Do GC in schema storage")
for i := 0; i < startIdx; i++ {
s.snaps[i].PrintStatus(log.Debug)
}
}
s.snaps = s.snaps[startIdx:]
atomic.StoreUint64(&s.gcTs, s.snaps[0].currentTs)
log.Info("finished gc in schema storage", zap.Uint64("gcTs", s.snaps[0].currentTs))

// copy the part of the slice that is needed instead of re-slicing it
// to maximize efficiency of Go runtime GC.
newSnaps := make([]*schemaSnapshot, len(s.snaps)-startIdx)
copy(newSnaps, s.snaps[startIdx:])
s.snaps = newSnaps

lastSchemaTs = s.snaps[0].currentTs
atomic.StoreUint64(&s.gcTs, lastSchemaTs)
return
}

// SkipJob skip the job should not be executed
Expand Down
8 changes: 6 additions & 2 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,9 @@ func TestMultiVersionStorage(t *testing.T) {
_, exist = snap.TableByID(3)
require.False(t, exist)

storage.DoGC(0)
lastSchemaTs := storage.DoGC(0)
require.Equal(t, uint64(0), lastSchemaTs)

snap, err = storage.GetSnapshot(ctx, 100)
require.Nil(t, err)
_, exist = snap.SchemaByID(1)
Expand All @@ -627,7 +629,9 @@ func TestMultiVersionStorage(t *testing.T) {
_, exist = snap.TableByID(3)
require.False(t, exist)

storage.DoGC(155)
lastSchemaTs = storage.DoGC(155)
require.Equal(t, uint64(140), lastSchemaTs)

storage.AdvanceResolvedTs(185)

snap, err = storage.GetSnapshot(ctx, 180)
Expand Down
12 changes: 0 additions & 12 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
tablepipeline "github.com/pingcap/ticdc/cdc/processor/pipeline"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
Expand Down Expand Up @@ -62,17 +61,6 @@ func NewManager() *Manager {
}
}

// NewManager4Test creates a new processor manager for test
func NewManager4Test(
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *Manager {
m := NewManager()
m.newProcessor = func(ctx cdcContext.Context) *processor {
return newProcessor4Test(ctx, createTablePipeline)
}
return m
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// the Tick function of Manager create or remove processor instances according to the specified `state`, or pass the `state` to processor instances
Expand Down
14 changes: 13 additions & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,20 @@ type managerSuite struct {

var _ = check.Suite(&managerSuite{})

// NewManager4Test creates a new processor manager for test
func NewManager4Test(
c *check.C,
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *Manager {
m := NewManager()
m.newProcessor = func(ctx cdcContext.Context) *processor {
return newProcessor4Test(ctx, c, createTablePipeline)
}
return m
}

func (s *managerSuite) resetSuit(ctx cdcContext.Context, c *check.C) {
s.manager = NewManager4Test(func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) {
s.manager = NewManager4Test(c, func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
Expand Down
8 changes: 8 additions & 0 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ var (
Name: "exit_with_error_count",
Help: "counter for processor exits with error",
}, []string{"changefeed", "capture"})
processorSchemaStorageGcTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "schema_storage_gc_ts",
Help: "the TS of the currently maintained oldest snapshot in SchemaStorage",
}, []string{"changefeed", "capture"})
)

// InitMetrics registers all metrics used in processor
Expand All @@ -70,4 +77,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(checkpointTsLagGauge)
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(processorSchemaStorageGcTsGauge)
}
71 changes: 38 additions & 33 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ import (
)

const (
schemaStorageGCLag = time.Minute * 20

backoffBaseDelayInMs = 5
maxTries = 3
)
Expand All @@ -61,6 +59,8 @@ type processor struct {
tables map[model.TableID]tablepipeline.TablePipeline

schemaStorage entry.SchemaStorage
lastSchemaTs model.Ts

filter *filter.Filter
mounter entry.Mounter
sinkManager *sink.Manager
Expand All @@ -75,12 +75,13 @@ type processor struct {
lazyInit func(ctx cdcContext.Context) error
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error)

metricResolvedTsGauge prometheus.Gauge
metricResolvedTsLagGauge prometheus.Gauge
metricCheckpointTsGauge prometheus.Gauge
metricCheckpointTsLagGauge prometheus.Gauge
metricSyncTableNumGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
metricResolvedTsGauge prometheus.Gauge
metricResolvedTsLagGauge prometheus.Gauge
metricCheckpointTsGauge prometheus.Gauge
metricCheckpointTsLagGauge prometheus.Gauge
metricSyncTableNumGauge prometheus.Gauge
metricSchemaStorageGcTsGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
}

// newProcessor creates a new processor
Expand All @@ -95,30 +96,19 @@ func newProcessor(ctx cdcContext.Context) *processor {
cancel: func() {},
lastRedoFlush: time.Now(),

metricResolvedTsGauge: resolvedTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricResolvedTsLagGauge: resolvedTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsGauge: checkpointTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsLagGauge: checkpointTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricSyncTableNumGauge: syncTableNumGauge.WithLabelValues(changefeedID, advertiseAddr),
metricProcessorErrorCounter: processorErrorCounter.WithLabelValues(changefeedID, advertiseAddr),
metricResolvedTsGauge: resolvedTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricResolvedTsLagGauge: resolvedTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsGauge: checkpointTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsLagGauge: checkpointTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricSyncTableNumGauge: syncTableNumGauge.WithLabelValues(changefeedID, advertiseAddr),
metricProcessorErrorCounter: processorErrorCounter.WithLabelValues(changefeedID, advertiseAddr),
metricSchemaStorageGcTsGauge: processorSchemaStorageGcTsGauge.WithLabelValues(changefeedID, advertiseAddr),
}
p.createTablePipeline = p.createTablePipelineImpl
p.lazyInit = p.lazyInitImpl
return p
}

func newProcessor4Test(ctx cdcContext.Context,
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *processor {
p := newProcessor(ctx)
p.lazyInit = func(ctx cdcContext.Context) error {
p.redoManager = redo.NewDisabledManager()
return nil
}
p.createTablePipeline = createTablePipeline
return p
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts, maintain table pipeline, error handling, etc.
Expand Down Expand Up @@ -190,7 +180,7 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
p.handlePosition()
p.pushResolvedTs2Table()
p.handleWorkload()
p.doGCSchemaStorage()
p.doGCSchemaStorage(ctx)
return p.changefeed, nil
}

Expand Down Expand Up @@ -771,16 +761,30 @@ func (p *processor) removeTable(table tablepipeline.TablePipeline, tableID model
}

// doGCSchemaStorage trigger the schema storage GC
func (p *processor) doGCSchemaStorage() {
func (p *processor) doGCSchemaStorage(ctx cdcContext.Context) {
if p.schemaStorage == nil {
// schemaStorage is nil only in test
return
}
// Delay GC to accommodate pullers starting from a startTs that's too small
// TODO fix startTs problem and remove GC delay, or use other mechanism that prevents the problem deterministically
gcTime := oracle.GetTimeFromTS(p.changefeed.Status.CheckpointTs).Add(-schemaStorageGCLag)
gcTs := oracle.ComposeTS(gcTime.Unix(), 0)
p.schemaStorage.DoGC(gcTs)
Comment on lines -781 to -783
Copy link
Contributor

@amyangfei amyangfei Nov 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the 20min GC delay used for originally?
I found the earliest issue for this fix: #1069, which said Some of other potential problems caused by table puller starting from a startTs that is too early.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was used to accommodate some buggy behaviors in the old processor, where a puller is started from a timestamp less than the global checkpoint. The new owner does not have this problem.


if p.changefeed.Status == nil {
// This could happen if Etcd data is not complete.
return
}

// Please refer to `unmarshalAndMountRowChanged` in cdc/entry/mounter.go
// for why we need -1.
lastSchemaTs := p.schemaStorage.DoGC(p.changefeed.Status.CheckpointTs - 1)
liuzix marked this conversation as resolved.
Show resolved Hide resolved
if p.lastSchemaTs == lastSchemaTs {
liuzix marked this conversation as resolved.
Show resolved Hide resolved
return
}
p.lastSchemaTs = lastSchemaTs

log.Debug("finished gc in schema storage",
zap.Uint64("gcTs", lastSchemaTs),
cdcContext.ZapFieldChangefeed(ctx))
lastSchemaPhysicalTs := oracle.ExtractPhysical(lastSchemaTs)
p.metricSchemaStorageGcTsGauge.Set(float64(lastSchemaPhysicalTs))
}

// flushRedoLogMeta flushes redo log meta, including resolved-ts and checkpoint-ts
Expand Down Expand Up @@ -814,6 +818,7 @@ func (p *processor) Close() error {
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
if p.sinkManager != nil {
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading