Skip to content

Commit

Permalink
schema_storage: fix schema GC threshold & improve memory management (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 27, 2022
1 parent 87e93fa commit 3883f52
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 48 deletions.
24 changes: 16 additions & 8 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (s *schemaSnapshot) Tables() map[model.TableID]*model.TableInfo {
// SchemaStorage stores the schema information with multi-version
type SchemaStorage interface {
// GetSnapshot returns the snapshot which of ts is specified
GetSnapshot(ctx context.Context, ts uint64) (*schemaSnapshot, error)
GetSnapshot(ctx context.Context, ts uint64) (*SingleSchemaSnapshot, error)
// GetLastSnapshot returns the last snapshot
GetLastSnapshot() *schemaSnapshot
// HandleDDLJob creates a new snapshot in storage and handles the ddl job
Expand All @@ -661,8 +661,9 @@ type SchemaStorage interface {
AdvanceResolvedTs(ts uint64)
// ResolvedTs returns the resolved ts of the schema storage
ResolvedTs() uint64
// DoGC removes snaps which of ts less than this specified ts
DoGC(ts uint64)
// DoGC removes snaps that are no longer needed at the specified TS.
// It returns the TS from which the oldest maintained snapshot is valid.
DoGC(ts uint64) (lastSchemaTs uint64)
}

type schemaStorageImpl struct {
Expand Down Expand Up @@ -796,7 +797,7 @@ func (s *schemaStorageImpl) ResolvedTs() uint64 {
}

// DoGC removes snaps which of ts less than this specified ts
func (s *schemaStorageImpl) DoGC(ts uint64) {
func (s *schemaStorageImpl) DoGC(ts uint64) (lastSchemaTs uint64) {
s.snapsMu.Lock()
defer s.snapsMu.Unlock()
var startIdx int
Expand All @@ -807,17 +808,24 @@ func (s *schemaStorageImpl) DoGC(ts uint64) {
startIdx = i
}
if startIdx == 0 {
return
return s.snaps[0].currentTs
}
if log.GetLevel() == zapcore.DebugLevel {
log.Debug("Do GC in schema storage")
for i := 0; i < startIdx; i++ {
s.snaps[i].PrintStatus(log.Debug)
}
}
s.snaps = s.snaps[startIdx:]
atomic.StoreUint64(&s.gcTs, s.snaps[0].currentTs)
log.Info("finished gc in schema storage", zap.Uint64("gcTs", s.snaps[0].currentTs))

// copy the part of the slice that is needed instead of re-slicing it
// to maximize efficiency of Go runtime GC.
newSnaps := make([]*schemaSnapshot, len(s.snaps)-startIdx)
copy(newSnaps, s.snaps[startIdx:])
s.snaps = newSnaps

lastSchemaTs = s.snaps[0].currentTs
atomic.StoreUint64(&s.gcTs, lastSchemaTs)
return
}

// SkipJob skip the job should not be executed
Expand Down
8 changes: 6 additions & 2 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,9 @@ func (t *schemaSuite) TestMultiVersionStorage(c *check.C) {
_, exist = snap.TableByID(3)
c.Assert(exist, check.IsFalse)

storage.DoGC(0)
lastSchemaTs := storage.DoGC(0)
c.Assert(lastSchemaTs, check.Equals, uint64(0))

snap, err = storage.GetSnapshot(ctx, 100)
c.Assert(err, check.IsNil)
_, exist = snap.SchemaByID(1)
Expand All @@ -644,7 +646,9 @@ func (t *schemaSuite) TestMultiVersionStorage(c *check.C) {
_, exist = snap.TableByID(3)
c.Assert(exist, check.IsFalse)

storage.DoGC(155)
lastSchemaTs = storage.DoGC(155)
c.Assert(lastSchemaTs, check.Equals, uint64(140))

storage.AdvanceResolvedTs(185)

snap, err = storage.GetSnapshot(ctx, 180)
Expand Down
12 changes: 0 additions & 12 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
Expand Down Expand Up @@ -62,17 +61,6 @@ func NewManager() *Manager {
}
}

// NewManager4Test creates a new processor manager for test
func NewManager4Test(
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *Manager {
m := NewManager()
m.newProcessor = func(ctx cdcContext.Context) *processor {
return newProcessor4Test(ctx, createTablePipeline)
}
return m
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// the Tick function of Manager create or remove processor instances according to the specified `state`, or pass the `state` to processor instances
Expand Down
14 changes: 13 additions & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,20 @@ type managerSuite struct {

var _ = check.Suite(&managerSuite{})

// NewManager4Test creates a new processor manager for test
func NewManager4Test(
c *check.C,
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *Manager {
m := NewManager()
m.newProcessor = func(ctx cdcContext.Context) *processor {
return newProcessor4Test(ctx, c, createTablePipeline)
}
return m
}

func (s *managerSuite) resetSuit(ctx cdcContext.Context, c *check.C) {
s.manager = NewManager4Test(func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) {
s.manager = NewManager4Test(c, func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
Expand Down
8 changes: 8 additions & 0 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ var (
Name: "exit_with_error_count",
Help: "counter for processor exits with error",
}, []string{"changefeed", "capture"})
processorSchemaStorageGcTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "schema_storage_gc_ts",
Help: "the TS of the currently maintained oldest snapshot in SchemaStorage",
}, []string{"changefeed", "capture"})
)

// InitMetrics registers all metrics used in processor
Expand All @@ -86,4 +93,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(checkpointTsMinTableIDGauge)
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(processorSchemaStorageGcTsGauge)
}
52 changes: 29 additions & 23 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"math"
"strconv"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -45,8 +44,6 @@ import (
)

const (
schemaStorageGCLag = time.Minute * 20

backoffBaseDelayInMs = 5
maxTries = 3
)
Expand All @@ -59,9 +56,11 @@ type processor struct {
tables map[model.TableID]tablepipeline.TablePipeline

schemaStorage entry.SchemaStorage
filter *filter.Filter
mounter entry.Mounter
sinkManager *sink.Manager
lastSchemaTs model.Ts

filter *filter.Filter
mounter entry.Mounter
sinkManager *sink.Manager

initialized bool
errCh chan error
Expand All @@ -78,6 +77,7 @@ type processor struct {
metricCheckpointTsLagGauge prometheus.Gauge
metricMinCheckpointTableIDGuage prometheus.Gauge
metricSyncTableNumGauge prometheus.Gauge
metricSchemaStorageGcTsGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
}

Expand All @@ -100,22 +100,13 @@ func newProcessor(ctx cdcContext.Context) *processor {
metricMinCheckpointTableIDGuage: checkpointTsMinTableIDGauge.WithLabelValues(changefeedID, advertiseAddr),
metricSyncTableNumGauge: syncTableNumGauge.WithLabelValues(changefeedID, advertiseAddr),
metricProcessorErrorCounter: processorErrorCounter.WithLabelValues(changefeedID, advertiseAddr),
metricSchemaStorageGcTsGauge: processorSchemaStorageGcTsGauge.WithLabelValues(changefeedID, advertiseAddr),
}
p.createTablePipeline = p.createTablePipelineImpl
p.lazyInit = p.lazyInitImpl
return p
}

func newProcessor4Test(ctx cdcContext.Context,
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *processor {
p := newProcessor(ctx)
p.lazyInit = func(ctx cdcContext.Context) error { return nil }
p.createTablePipeline = createTablePipeline
p.sinkManager = &sink.Manager{}
return p
}

// Tick implements the `orchestrator.State` interface
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts, maintain table pipeline, error handling, etc.
Expand Down Expand Up @@ -191,7 +182,7 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS
p.handlePosition(oracle.GetPhysical(pdTime))
p.pushResolvedTs2Table()
p.handleWorkload()
p.doGCSchemaStorage()
p.doGCSchemaStorage(ctx)
return p.changefeed, nil
}

Expand Down Expand Up @@ -767,16 +758,30 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
}

// doGCSchemaStorage trigger the schema storage GC
func (p *processor) doGCSchemaStorage() {
func (p *processor) doGCSchemaStorage(ctx cdcContext.Context) {
if p.schemaStorage == nil {
// schemaStorage is nil only in test
return
}
// Delay GC to accommodate pullers starting from a startTs that's too small
// TODO fix startTs problem and remove GC delay, or use other mechanism that prevents the problem deterministically
gcTime := oracle.GetTimeFromTS(p.changefeed.Status.CheckpointTs).Add(-schemaStorageGCLag)
gcTs := oracle.ComposeTS(gcTime.Unix(), 0)
p.schemaStorage.DoGC(gcTs)

if p.changefeed.Status == nil {
// This could happen if Etcd data is not complete.
return
}

// Please refer to `unmarshalAndMountRowChanged` in cdc/entry/mounter.go
// for why we need -1.
lastSchemaTs := p.schemaStorage.DoGC(p.changefeed.Status.CheckpointTs - 1)
if p.lastSchemaTs == lastSchemaTs {
return
}
p.lastSchemaTs = lastSchemaTs

log.Debug("finished gc in schema storage",
zap.Uint64("gcTs", lastSchemaTs),
cdcContext.ZapFieldChangefeed(ctx))
lastSchemaPhysicalTs := oracle.ExtractPhysical(lastSchemaTs)
p.metricSchemaStorageGcTsGauge.Set(float64(lastSchemaPhysicalTs))
}

func (p *processor) Close() error {
Expand All @@ -796,6 +801,7 @@ func (p *processor) Close() error {
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
if p.sinkManager != nil {
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
Expand Down
83 changes: 82 additions & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@ package processor

import (
"context"
"encoding/json"
"fmt"
"math"
"sync/atomic"
"testing"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/sink"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/util/testleak"
)
Expand All @@ -35,8 +41,21 @@ type processorSuite struct{}

var _ = check.Suite(&processorSuite{})

func newProcessor4Test(
ctx cdcContext.Context,
c *check.C,
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error),
) *processor {
p := newProcessor(ctx)
p.lazyInit = func(ctx cdcContext.Context) error { return nil }
p.createTablePipeline = createTablePipeline
p.sinkManager = &sink.Manager{}
p.schemaStorage = &mockSchemaStorage{c: c}
return p
}

func initProcessor4Test(ctx cdcContext.Context, c *check.C) (*processor, *orchestrator.ReactorStateTester) {
p := newProcessor4Test(ctx, func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) {
p := newProcessor4Test(ctx, c, func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error) {
return &mockTablePipeline{
tableID: tableID,
name: fmt.Sprintf("`test`.`table%d`", tableID),
Expand Down Expand Up @@ -109,6 +128,25 @@ func (m *mockTablePipeline) Wait() {
// do nothing
}

type mockSchemaStorage struct {
// dummy to provide default versions of unimplemented interface methods,
// as we only need ResolvedTs() and DoGC() in unit tests.
entry.SchemaStorage

c *check.C
lastGcTs uint64
}

func (s *mockSchemaStorage) ResolvedTs() uint64 {
return math.MaxUint64
}

func (s *mockSchemaStorage) DoGC(ts uint64) uint64 {
s.c.Assert(s.lastGcTs, check.LessEqual, ts)
atomic.StoreUint64(&s.lastGcTs, ts)
return ts
}

func (s *processorSuite) TestCheckTablesNum(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
Expand Down Expand Up @@ -664,6 +702,32 @@ func (s *processorSuite) TestPositionDeleted(c *check.C) {
})
}

func (s *processorSuite) TestSchemaGC(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
p, tester := initProcessor4Test(ctx, c)
p.changefeed.PatchTaskStatus(p.captureInfo.ID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) {
status.Tables[1] = &model.TableReplicaInfo{StartTs: 30}
status.Tables[2] = &model.TableReplicaInfo{StartTs: 40}
return status, true, nil
})

var err error
// init tick
_, err = p.Tick(ctx, p.changefeed)
c.Assert(err, check.IsNil)
tester.MustApplyPatches()

updateChangeFeedPosition(c, tester, "changefeed-id-test", 50, 50)
_, err = p.Tick(ctx, p.changefeed)
c.Assert(err, check.IsNil)
tester.MustApplyPatches()

// GC Ts should be (checkpoint - 1).
c.Assert(p.schemaStorage.(*mockSchemaStorage).lastGcTs, check.Equals, uint64(49))
c.Assert(p.lastSchemaTs, check.Equals, uint64(49))
}

func cleanUpFinishedOpOperation(state *model.ChangefeedReactorState, captureID model.CaptureID, tester *orchestrator.ReactorStateTester) {
state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) {
if status == nil || status.Operation == nil {
Expand All @@ -678,3 +742,20 @@ func cleanUpFinishedOpOperation(state *model.ChangefeedReactorState, captureID m
})
tester.MustApplyPatches()
}

func updateChangeFeedPosition(c *check.C, tester *orchestrator.ReactorStateTester, cfID model.ChangeFeedID, resolvedTs, checkpointTs model.Ts) {
key := etcd.CDCKey{
Tp: etcd.CDCKeyTypeChangeFeedStatus,
ChangefeedID: cfID,
}
keyStr := key.String()

cfStatus := &model.ChangeFeedStatus{
ResolvedTs: resolvedTs,
CheckpointTs: checkpointTs,
}
valueBytes, err := json.Marshal(cfStatus)
c.Assert(err, check.IsNil)

tester.MustUpdate(keyStr, valueBytes)
}
2 changes: 1 addition & 1 deletion metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -10759,4 +10759,4 @@
"title": "Test-Cluster-TiCDC",
"uid": "YiGL8hBZ1",
"version": 32
}
}

0 comments on commit 3883f52

Please sign in to comment.