Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc, pkg/txnutil: skip resolve lock during puller initialization #910

Merged
merged 10 commits into from
Sep 4, 2020
169 changes: 56 additions & 113 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package kv

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -35,11 +34,11 @@ import (
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/txnutil"
"github.com/pingcap/ticdc/pkg/util"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -359,14 +358,23 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
return
}

// PullerInitialization is a workaround to solved cyclic import.
type PullerInitialization interface {
IsInitialized() bool
}

// EventFeed divides a EventFeed request on range boundaries and establishes
// a EventFeed to each of the individual region. It streams back result on the
// provided channel.
// The `Start` and `End` field in input span must be memcomparable encoded.
func (c *CDCClient) EventFeed(
ctx context.Context, span regionspan.ComparableSpan, ts uint64, enableOldValue bool, eventCh chan<- *model.RegionFeedEvent,
ctx context.Context, span regionspan.ComparableSpan, ts uint64,
enableOldValue bool,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
eventCh chan<- *model.RegionFeedEvent,
) error {
s := newEventFeedSession(c, c.regionCache, c.kvStorage, span, enableOldValue, eventCh)
s := newEventFeedSession(c, c.regionCache, c.kvStorage, span, lockResolver, isPullerInit, enableOldValue, eventCh)
return s.eventFeed(ctx, ts)
}

Expand All @@ -381,6 +389,9 @@ type eventFeedSession struct {
regionCache *tikv.RegionCache
kvStorage tikv.Storage

lockResolver txnutil.LockResolver
isPullerInit PullerInitialization

// The whole range that is being subscribed.
totalSpan regionspan.ComparableSpan

Expand Down Expand Up @@ -414,6 +425,8 @@ func newEventFeedSession(
regionCache *tikv.RegionCache,
kvStorage tikv.Storage,
totalSpan regionspan.ComparableSpan,
lockResolver txnutil.LockResolver,
isPullerInit PullerInitialization,
enableOldValue bool,
eventCh chan<- *model.RegionFeedEvent,
) *eventFeedSession {
Expand All @@ -429,6 +442,8 @@ func newEventFeedSession(
requestRangeCh: make(chan rangeRequestTask, 16),
rangeLock: regionspan.NewRegionRangeLock(),
enableOldValue: enableOldValue,
lockResolver: lockResolver,
isPullerInit: isPullerInit,
id: strconv.FormatUint(allocID(), 10),
regionChSizeGauge: clientChannelSize.WithLabelValues(id, "region"),
errChSizeGauge: clientChannelSize.WithLabelValues(id, "err"),
Expand Down Expand Up @@ -712,7 +727,12 @@ MainLoop:
})
}

log.Info("start new request", zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr))
logReq := log.Debug
if s.isPullerInit.IsInitialized() {
logReq = log.Info
}
logReq("start new request", zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr))

err = stream.Send(req)

// If Send error, the receiver should have received error too or will receive error soon. So we doesn't need
Expand Down Expand Up @@ -1105,9 +1125,9 @@ func (s *eventFeedSession) receiveFromStream(

// singleEventFeed handles events of a single EventFeed stream.
// Results will be send to eventCh
// EventFeed RPC will not return checkpoint event directly
// EventFeed RPC will not return resolved event directly
// Resolved event is generate while there's not non-match pre-write
// Return the maximum checkpoint
// Return the maximum resolved
func (s *eventFeedSession) singleEventFeed(
ctx context.Context,
regionID uint64,
Expand All @@ -1134,7 +1154,7 @@ func (s *eventFeedSession) singleEventFeed(
defer advanceCheckTicker.Stop()
lastReceivedEventTime := time.Now()
startFeedTime := time.Now()
checkpointTs := startTs
lastResolvedTs := startTs
select {
case s.eventCh <- &model.RegionFeedEvent{
RegionID: regionID,
Expand All @@ -1144,18 +1164,22 @@ func (s *eventFeedSession) singleEventFeed(
},
}:
case <-ctx.Done():
return checkpointTs, errors.Trace(ctx.Err())
return lastResolvedTs, errors.Trace(ctx.Err())
}
for {
var event *cdcpb.Event
var ok bool
select {
case <-ctx.Done():
return checkpointTs, ctx.Err()
return lastResolvedTs, ctx.Err()
case <-advanceCheckTicker.C:
if time.Since(startFeedTime) < 20*time.Second {
continue
}
if !s.isPullerInit.IsInitialized() {
// Initializing a puller may take a long time, skip resolved lock to save unnecessary overhead.
continue
}
sinceLastEvent := time.Since(lastReceivedEventTime)
if sinceLastEvent > time.Second*20 {
log.Warn("region not receiving event from tikv for too long time",
Expand All @@ -1167,14 +1191,14 @@ func (s *eventFeedSession) singleEventFeed(
continue
}
currentTimeFromPD := oracle.GetTimeFromTS(version.Ver)
sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(checkpointTs))
sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(lastResolvedTs))
if sinceLastResolvedTs > time.Second*20 && initialized {
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
zap.Uint64("regionID", regionID), zap.Stringer("span", span),
zap.Duration("duration", sinceLastResolvedTs),
zap.Uint64("checkpointTs", checkpointTs))
zap.Uint64("resolvedTs", lastResolvedTs))
maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0)
err = s.resolveLock(ctx, regionID, maxVersion)
err = s.lockResolver.Resolve(ctx, regionID, maxVersion)
if err != nil {
log.Warn("failed to resolve lock", zap.Uint64("regionID", regionID), zap.Error(err))
continue
Expand All @@ -1185,12 +1209,12 @@ func (s *eventFeedSession) singleEventFeed(
}
if !ok {
log.Debug("singleEventFeed receiver closed")
return checkpointTs, nil
return lastResolvedTs, nil
}

if event == nil {
log.Debug("singleEventFeed closed by error")
return checkpointTs, errors.New("single event feed aborted")
return lastResolvedTs, errors.New("single event feed aborted")
}
lastReceivedEventTime = time.Now()

Expand Down Expand Up @@ -1221,13 +1245,13 @@ func (s *eventFeedSession) singleEventFeed(

revent, err := assembleCommitEvent(regionID, entry, value)
if err != nil {
return checkpointTs, errors.Trace(err)
return lastResolvedTs, errors.Trace(err)
}
select {
case s.eventCh <- revent:
metricSendEventCommitCounter.Inc()
case <-ctx.Done():
return checkpointTs, errors.Trace(ctx.Err())
return lastResolvedTs, errors.Trace(ctx.Err())
}
}
matcher.clearCacheCommit()
Expand All @@ -1240,7 +1264,7 @@ func (s *eventFeedSession) singleEventFeed(
case cdcpb.Event_Row_PUT:
opType = model.OpTypePut
default:
return checkpointTs, errors.Errorf("unknown tp: %v", entry.GetOpType())
return lastResolvedTs, errors.Errorf("unknown tp: %v", entry.GetOpType())
}

revent := &model.RegionFeedEvent{
Expand All @@ -1256,29 +1280,29 @@ func (s *eventFeedSession) singleEventFeed(
},
}

if entry.CommitTs <= checkpointTs {
if entry.CommitTs <= lastResolvedTs {
log.Fatal("The CommitTs must be greater than the resolvedTs",
zap.String("Event Type", "COMMITTED"),
zap.Uint64("CommitTs", entry.CommitTs),
zap.Uint64("resolvedTs", checkpointTs),
zap.Uint64("resolvedTs", lastResolvedTs),
zap.Uint64("regionID", regionID))
}
select {
case s.eventCh <- revent:
metricSendEventCommittedCounter.Inc()
case <-ctx.Done():
return checkpointTs, errors.Trace(ctx.Err())
return lastResolvedTs, errors.Trace(ctx.Err())
}
case cdcpb.Event_PREWRITE:
metricPullEventPrewriteCounter.Inc()
matcher.putPrewriteRow(entry)
case cdcpb.Event_COMMIT:
metricPullEventCommitCounter.Inc()
if entry.CommitTs <= checkpointTs {
if entry.CommitTs <= lastResolvedTs {
log.Fatal("The CommitTs must be greater than the resolvedTs",
zap.String("Event Type", "COMMIT"),
zap.Uint64("CommitTs", entry.CommitTs),
zap.Uint64("resolvedTs", checkpointTs),
zap.Uint64("resolvedTs", lastResolvedTs),
zap.Uint64("regionID", regionID))
}
// emit a value
Expand All @@ -1288,21 +1312,21 @@ func (s *eventFeedSession) singleEventFeed(
matcher.cacheCommitRow(entry)
continue
}
return checkpointTs,
return lastResolvedTs,
errors.Errorf("prewrite not match, key: %b, start-ts: %d",
entry.GetKey(), entry.GetStartTs())
}

revent, err := assembleCommitEvent(regionID, entry, value)
if err != nil {
return checkpointTs, errors.Trace(err)
return lastResolvedTs, errors.Trace(err)
}

select {
case s.eventCh <- revent:
metricSendEventCommitCounter.Inc()
case <-ctx.Done():
return checkpointTs, errors.Trace(ctx.Err())
return lastResolvedTs, errors.Trace(ctx.Err())
}
case cdcpb.Event_ROLLBACK:
metricPullEventRollbackCounter.Inc()
Expand All @@ -1312,16 +1336,16 @@ func (s *eventFeedSession) singleEventFeed(
case *cdcpb.Event_Admin_:
log.Info("receive admin event", zap.Stringer("event", event))
case *cdcpb.Event_Error:
return checkpointTs, errors.Trace(&eventError{err: x.Error})
return lastResolvedTs, errors.Trace(&eventError{err: x.Error})
case *cdcpb.Event_ResolvedTs:
if !initialized {
continue
}
if x.ResolvedTs < checkpointTs {
if x.ResolvedTs < lastResolvedTs {
log.Warn("The resolvedTs is fallen back in kvclient",
zap.String("Event Type", "RESOLVED"),
zap.Uint64("resolvedTs", x.ResolvedTs),
zap.Uint64("lastResolvedTs", checkpointTs),
zap.Uint64("lastResolvedTs", lastResolvedTs),
zap.Uint64("regionID", regionID))
continue
}
Expand All @@ -1333,98 +1357,17 @@ func (s *eventFeedSession) singleEventFeed(
ResolvedTs: x.ResolvedTs,
},
}
checkpointTs = x.ResolvedTs
lastResolvedTs = x.ResolvedTs

select {
case s.eventCh <- revent:
metricSendEventResolvedCounter.Inc()
case <-ctx.Done():
return checkpointTs, errors.Trace(ctx.Err())
}
}

}
}

const scanLockLimit = 1024

func (s *eventFeedSession) resolveLock(ctx context.Context, regionID uint64, maxVersion uint64) error {
// TODO test whether this function will kill active transaction
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{
MaxVersion: maxVersion,
Limit: scanLockLimit,
})

bo := tikv.NewBackoffer(ctx, tikv.GcResolveLockMaxBackoff)
var loc *tikv.KeyLocation
var key []byte
flushRegion := func() error {
var err error
loc, err = s.kvStorage.GetRegionCache().LocateRegionByID(bo, regionID)
if err != nil {
return err
}
key = loc.StartKey
return nil
}
if err := flushRegion(); err != nil {
return errors.Trace(err)
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
req.ScanLock().StartKey = key
resp, err := s.kvStorage.SendReq(bo, req, loc.Region, tikv.ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
return lastResolvedTs, errors.Trace(ctx.Err())
}
if err := flushRegion(); err != nil {
return errors.Trace(err)
}
continue
}
if resp.Resp == nil {
return errors.Trace(tikv.ErrBodyMissing)
}
locksResp := resp.Resp.(*kvrpcpb.ScanLockResponse)
if locksResp.GetError() != nil {
return errors.Errorf("unexpected scanlock error: %s", locksResp)
}
locksInfo := locksResp.GetLocks()
locks := make([]*tikv.Lock, len(locksInfo))
for i := range locksInfo {
locks[i] = tikv.NewLock(locksInfo[i])
}

_, _, err1 := s.kvStorage.GetLockResolver().ResolveLocks(bo, 0, locks)
if err1 != nil {
return errors.Trace(err1)
}
if len(locks) < scanLockLimit {
key = loc.EndKey
} else {
key = locks[len(locks)-1].Key
}

if len(key) == 0 || (len(loc.EndKey) != 0 && bytes.Compare(key, loc.EndKey) >= 0) {
break
}
bo = tikv.NewBackoffer(ctx, tikv.GcResolveLockMaxBackoff)
}
log.Info("resolve lock successfully", zap.Uint64("regionID", regionID), zap.Uint64("maxVersion", maxVersion))
return nil
}

func assembleCommitEvent(regionID uint64, entry *cdcpb.Event_Row, value *pendingValue) (*model.RegionFeedEvent, error) {
Expand All @@ -1435,7 +1378,7 @@ func assembleCommitEvent(regionID uint64, entry *cdcpb.Event_Row, value *pending
case cdcpb.Event_Row_PUT:
opType = model.OpTypePut
default:
return nil, errors.Errorf("unknow tp: %v", entry.GetOpType())
return nil, errors.Errorf("unknow tp: %v, %v", entry.GetOpType(), entry)
overvenus marked this conversation as resolved.
Show resolved Hide resolved
}

revent := &model.RegionFeedEvent{
Expand Down
Loading