Skip to content

Commit

Permalink
add debug log
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro committed May 18, 2020
1 parent bf26ac6 commit b8be781
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
1 change: 1 addition & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
}
snap, err := m.schemaStorage.GetSnapshot(ctx, raw.CRTs)
if err != nil {
log.Error("get schema snapshot failed", zap.Any("base info of kv", baseInfo))
return nil, errors.Trace(err)
}
row, err := func() (*model.RowChangedEvent, error) {
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (t *tableInfo) loadResolvedTS() uint64 {
}

func (t *tableInfo) storeResolvedTS(ts uint64) {
log.Info("store resolved ts of table", zap.Uint64("ts", ts), zap.Int64("tableID", t.id))
atomic.StoreUint64(&t.resolvedTS, ts)
}

Expand Down Expand Up @@ -312,12 +313,13 @@ func (p *processor) positionWorker(ctx context.Context) error {
p.stateMu.Lock()
for _, table := range p.tables {
ts := table.loadResolvedTS()

log.Info("load resolved ts of table", zap.Uint64("ts", ts), zap.Int64("tableID", table.id))
if ts < minResolvedTs {
minResolvedTs = ts
}
}
p.stateMu.Unlock()
log.Info("min resolved ts", zap.Uint64("ts", minResolvedTs))

if minResolvedTs == p.position.ResolvedTs {
continue
Expand Down

0 comments on commit b8be781

Please sign in to comment.