diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index ccb6c116791..8013399a669 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -145,10 +145,6 @@ type serverWatchStream struct { // records fragmented watch IDs fragment map[mvcc.WatchID]bool - // indicates whether we have an outstanding global progress - // notification to send - deferredProgress bool - // closec indicates the stream is closed. closec chan struct{} @@ -178,8 +174,6 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { prevKV: make(map[mvcc.WatchID]bool), fragment: make(map[mvcc.WatchID]bool), - deferredProgress: false, - closec: make(chan struct{}), } @@ -375,14 +369,7 @@ func (sws *serverWatchStream) recvLoop() error { case *pb.WatchRequest_ProgressRequest: if uv.ProgressRequest != nil { sws.mu.Lock() - // Ignore if deferred progress notification is already in progress - if !sws.deferredProgress { - // Request progress for all watchers, - // force generation of a response - if !sws.watchStream.RequestProgressAll() { - sws.deferredProgress = true - } - } + sws.watchStream.RequestProgressAll() sws.mu.Unlock() } default: @@ -498,11 +485,6 @@ func (sws *serverWatchStream) sendLoop() { // elide next progress update if sent a key update sws.progress[wresp.WatchID] = false } - if sws.deferredProgress { - if sws.watchStream.RequestProgressAll() { - sws.deferredProgress = false - } - } sws.mu.Unlock() case c, ok := <-sws.ctrlStream: diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 6d710758830..405505636dd 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -1391,8 +1391,8 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) { wch := client.Watch(ctx, "foo", clientv3.WithRev(1)) // Immediately request a progress notification. As the client - // is unsynchronised, the server will have to defer the - // notification internally. + // is unsynchronised, the server will not sent any notification, + //as client can infer progress from events. err := client.RequestProgress(ctx) require.NoError(t, err) @@ -1412,8 +1412,9 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) { } event_count += len(wr.Events) } - - // ... followed by the requested progress notification + // client needs to request progress notification again + err = client.RequestProgress(ctx) + require.NoError(t, err) wr2 := <-wch if wr2.Err() != nil { t.Fatal(fmt.Errorf("watch error: %w", wr2.Err())) @@ -1425,3 +1426,46 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) { t.Fatal("Wrong revision in progress notification!") } } + +func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) { + if ThroughProxy { + t.Skip("grpc proxy currently does not support requesting progress notifications") + } + + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + client := clus.RandClient() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + resp, err := client.Put(ctx, "bar", "1") + require.NoError(t, err) + + wch := client.Watch(ctx, "foo", clientv3.WithRev(resp.Header.Revision)) + // Request the progress notification on newly created watch that was not yet synced. + err = client.RequestProgress(ctx) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + require.NoError(t, err) + gotProgressNotification := false + for { + select { + case <-ticker.C: + err := client.RequestProgress(ctx) + require.NoError(t, err) + case resp := <-wch: + if resp.Err() != nil { + t.Fatal(fmt.Errorf("watch error: %w", resp.Err())) + } + if resp.IsProgressNotify() { + gotProgressNotification = true + } + } + if gotProgressNotification { + break + } + } + require.True(t, gotProgressNotification, "Expected to get progress notification") +}