Skip to content

Commit

Permalink
Merge pull request #12055 from tangcong/automated-cherry-pick-of-#118…
Browse files Browse the repository at this point in the history
…50-origin-release-3.4

Automated cherry pick of #11850
  • Loading branch information
gyuho authored Jun 25, 2020
2 parents 45192cf + 36452a1 commit 83fc96d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
21 changes: 20 additions & 1 deletion clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
mvccpb "go.etcd.io/etcd/mvcc/mvccpb"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -616,7 +617,10 @@ func (w *watchGrpcStream) run() {
},
}
req := &pb.WatchRequest{RequestUnion: cr}
wc.Send(req)
lg.Info("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
if err := wc.Send(req); err != nil {
lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
}
}

// watch client failed on Recv; spawn another if possible
Expand All @@ -637,6 +641,21 @@ func (w *watchGrpcStream) run() {
return

case ws := <-w.closingc:
if ws.id != -1 {
// client is closing an established watch; close it on the server proactively instead of waiting
// to close when the next message arrives
cancelSet[ws.id] = struct{}{}
cr := &pb.WatchRequest_CancelRequest{
CancelRequest: &pb.WatchCancelRequest{
WatchId: ws.id,
},
}
req := &pb.WatchRequest{RequestUnion: cr}
lg.Info("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
if err := wc.Send(req); err != nil {
lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
}
}
w.closeSubstream(ws)
delete(closing, ws)
// no more watchers on this stream, shutdown
Expand Down
32 changes: 32 additions & 0 deletions integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,3 +1211,35 @@ func TestV3WatchWithPrevKV(t *testing.T) {
}
}
}

// TestV3WatchCancellation ensures that watch cancellation frees up server resources.
func TestV3WatchCancellation(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

cli := clus.RandClient()

// increment watcher total count and keep a stream open
cli.Watch(ctx, "/foo")

for i := 0; i < 1000; i++ {
ctx, cancel := context.WithCancel(ctx)
cli.Watch(ctx, "/foo")
cancel()
}

// Wait a little for cancellations to take hold
time.Sleep(3 * time.Second)

minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
if err != nil {
t.Fatal(err)
}

if minWatches != "1" {
t.Fatalf("expected one watch, got %s", minWatches)
}
}

0 comments on commit 83fc96d

Please sign in to comment.