Skip to content

Commit

Permalink
etcdutil: fix ctx in watch loop (tikv#6445)
Browse files Browse the repository at this point in the history
close tikv#6439

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent 7fa24f3 commit 9daa465
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,19 +427,19 @@ func (lw *LoopWatcher) StartWatchLoop() {
defer logutil.LogPanic()
defer lw.wg.Done()

ctx, cancel := context.WithTimeout(lw.ctx, lw.loadTimeout)
ctx, cancel := context.WithCancel(lw.ctx)
defer cancel()
watchStartRevision := lw.initFromEtcd(ctx)

log.Info("start to watch loop", zap.String("name", lw.name), zap.String("key", lw.key))
for {
select {
case <-lw.ctx.Done():
case <-ctx.Done():
log.Info("server is closed, exit watch loop", zap.String("name", lw.name), zap.String("key", lw.key))
return
default:
}
nextRevision, err := lw.watch(lw.ctx, watchStartRevision)
nextRevision, err := lw.watch(ctx, watchStartRevision)
if err != nil {
log.Error("watcher canceled unexpectedly and a new watcher will start after a while for watch loop",
zap.String("name", lw.name),
Expand All @@ -463,6 +463,8 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
)
ticker := time.NewTicker(defaultLoadFromEtcdRetryInterval)
defer ticker.Stop()
ctx, cancel := context.WithTimeout(ctx, lw.loadTimeout)
defer cancel()

for i := 0; i < lw.loadRetryTimes; i++ {
failpoint.Inject("loadTemporaryFail", func(val failpoint.Value) {
Expand All @@ -489,6 +491,8 @@ func (lw *LoopWatcher) initFromEtcd(ctx context.Context) int64 {
}
if err != nil {
log.Warn("meet error when loading in watch loop", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err))
} else {
log.Info("load finished in watch loop", zap.String("name", lw.name), zap.String("key", lw.key))
}
lw.isLoadedCh <- err
return watchStartRevision
Expand All @@ -500,8 +504,12 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision

for {
WatchChan:
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watchChanCtx, watchChanCancel := context.WithCancel(clientv3.WithRequireLeader(ctx))
defer watchChanCancel()
opts := append(lw.opts, clientv3.WithRev(revision))
watchChan := watcher.Watch(ctx, lw.key, opts...)
watchChan := watcher.Watch(watchChanCtx, lw.key, opts...)
select {
case <-ctx.Done():
return revision, nil
Expand All @@ -511,13 +519,15 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
log.Warn("force load key failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
watchChanCancel()
goto WatchChan
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision in watch loop",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
watchChanCancel()
goto WatchChan
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("watcher is canceled in watch loop",
Expand All @@ -532,11 +542,16 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
log.Error("put failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
log.Debug("put in watch loop", zap.String("name", lw.name),
zap.ByteString("key", event.Kv.Key),
zap.ByteString("value", event.Kv.Value))
case clientv3.EventTypeDelete:
if err := lw.deleteFn(event.Kv); err != nil {
log.Error("delete failed in watch loop", zap.String("name", lw.name),
zap.String("key", lw.key), zap.Error(err))
}
log.Debug("delete in watch loop", zap.String("name", lw.name),
zap.ByteString("key", event.Kv.Key))
}
}
if err := lw.postEventFn(); err != nil {
Expand All @@ -545,6 +560,7 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision
}
revision = wresp.Header.Revision + 1
}
watchChanCancel()
}
}

Expand Down Expand Up @@ -580,6 +596,7 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error)
log.Error("put failed in watch loop when loading", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err))
}
}
// Note: if there are no keys in etcd, the resp.More is false. It also means the load is finished.
if !resp.More {
if err := lw.postEventFn(); err != nil {
log.Error("run post event failed in watch loop", zap.String("name", lw.name),
Expand Down

0 comments on commit 9daa465

Please sign in to comment.