diff --git a/etcd/v3/kv_etcd.go b/etcd/v3/kv_etcd.go index 2d3bf593..f8919dcf 100644 --- a/etcd/v3/kv_etcd.go +++ b/etcd/v3/kv_etcd.go @@ -81,8 +81,11 @@ func (w *watchQ) start() { if err != nil { w.done = true logrus.Infof("Watch cb for key %v returned err: %v", key, err) - // Indicate the caller that watch has been canceled - _ = w.cb(key, w.opaque, nil, kvdb.ErrWatchStopped) + if err != kvdb.ErrWatchStopped { + // The caller returned an error. Indicate the caller + // that the watch has been stopped + _ = w.cb(key, w.opaque, nil, kvdb.ErrWatchStopped) + } // else we stopped the watch and the caller has been notified // Indicate that watch is returning. close(w.watchRet) break @@ -902,8 +905,10 @@ func (et *etcdKV) watchStart( } sessionChan := make(chan int, 1) var ( - session *concurrency.Session - err error + session *concurrency.Session + err error + watchStopLock sync.Mutex + watchStopped bool ) go func() { session, err = concurrency.NewSession( @@ -925,7 +930,7 @@ func (et *etcdKV) watchStart( _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) return } - ctx, watchCancel := context.WithCancel(context.Background()) + ctx, watchCancel := context.WithCancel(getContextWithLeaderRequirement()) watchRet := make(chan error) watchChan := et.kvClient.Watch(ctx, key, opts...) watchQ := newWatchQ(opaque, cb, watchRet) @@ -961,14 +966,26 @@ func (et *etcdKV) watchStart( } } logrus.Errorf("Watch on key %v closed without a Cancel response.", key) - watchQ.enqueue(key, nil, kvdb.ErrWatchStopped) + watchStopLock.Lock() + // Stop the watch only if it has not been stopped already + if !watchStopped { + watchQ.enqueue(key, nil, kvdb.ErrWatchStopped) + watchStopped = true + } + watchStopLock.Unlock() }() select { case <-session.Done(): // closed by etcd // Indicate the caller that watch has been canceled logrus.Errorf("Watch closing session for key: %v", key) - watchQ.enqueue(key, nil, kvdb.ErrWatchStopped) + watchStopLock.Lock() + // Stop the watch only if it has not been stopped already + if !watchStopped { + watchQ.enqueue(key, nil, kvdb.ErrWatchStopped) + watchStopped = true + } + watchStopLock.Unlock() watchCancel() case <-watchRet: // error in watcher // Close the context