From b6373f16255f88314a29932c58b0c143e4eeae77 Mon Sep 17 00:00:00 2001 From: Iwasaki Yudai Date: Mon, 5 Feb 2018 17:08:24 -0800 Subject: [PATCH 1/2] mvcc: restore unsynced watchers In case syncWatchersLoop() starts before Restore() is called, watchers already added by that moment are moved to s.synced by the loop. However, there is a broken logic that moves watchers from s.synced to s.uncyned without setting keyWatchers of the watcherGroup. Eventually syncWatchers() fails to pickup those watchers from s.unsynced and no events are sent to the watchers, because newWatcherBatch() called in the function uses wg.watcherSetByKey() internally that requires a proper keyWatchers value. --- mvcc/watchable_store.go | 2 +- mvcc/watchable_store_test.go | 65 ++++++++++++++++++++---------------- 2 files changed, 38 insertions(+), 29 deletions(-) diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 028e05b9ad8..7be9641d402 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -267,7 +267,7 @@ func (s *watchableStore) Restore(b backend.Backend) error { } for wa := range s.synced.watchers { - s.unsynced.watchers.add(wa) + s.unsynced.add(wa) } s.synced = newWatcherGroup() return nil diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 42d2ae57151..25639fa8a70 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -295,36 +295,45 @@ func TestWatchFutureRev(t *testing.T) { } func TestWatchRestore(t *testing.T) { - b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) - defer cleanup(s, b, tmpPath) - - testKey := []byte("foo") - testValue := []byte("bar") - rev := s.Put(testKey, testValue, lease.NoLease) - - newBackend, newPath := backend.NewDefaultTmpBackend() - newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil) - defer cleanup(newStore, newBackend, newPath) - - w := newStore.NewWatchStream() - w.Watch(testKey, nil, rev-1) - - newStore.Restore(b) - select { - case resp := <-w.Chan(): - if resp.Revision != rev { - t.Fatalf("rev = %d, want %d", resp.Revision, rev) - } - if len(resp.Events) != 1 { - t.Fatalf("failed to get events from the response") - } - if resp.Events[0].Kv.ModRevision != rev { - t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev) + test := func(delay time.Duration) func(t *testing.T) { + return func(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b, &lease.FakeLessor{}, nil) + defer cleanup(s, b, tmpPath) + + testKey := []byte("foo") + testValue := []byte("bar") + rev := s.Put(testKey, testValue, lease.NoLease) + + newBackend, newPath := backend.NewDefaultTmpBackend() + newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil) + defer cleanup(newStore, newBackend, newPath) + + w := newStore.NewWatchStream() + w.Watch(0, testKey, nil, rev-1) + + time.Sleep(delay) + + newStore.Restore(b) + select { + case resp := <-w.Chan(): + if resp.Revision != rev { + t.Fatalf("rev = %d, want %d", resp.Revision, rev) + } + if len(resp.Events) != 1 { + t.Fatalf("failed to get events from the response") + } + if resp.Events[0].Kv.ModRevision != rev { + t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev) + } + case <-time.After(time.Second): + t.Fatal("failed to receive event in 1 second.") + } } - case <-time.After(time.Second): - t.Fatal("failed to receive event in 1 second.") } + + t.Run("Normal", test(0)) + t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration } // TestWatchBatchUnsynced tests batching on unsynced watchers From 087b9aa3dc9e16f3232b0894ecb637162e78a49b Mon Sep 17 00:00:00 2001 From: Joe Betz Date: Wed, 7 Feb 2018 15:57:34 -0800 Subject: [PATCH 2/2] mvcc: fix watchable store test for 3.2 cherrypick of #9281 --- mvcc/watchable_store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 25639fa8a70..cef00d41f57 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -310,7 +310,7 @@ func TestWatchRestore(t *testing.T) { defer cleanup(newStore, newBackend, newPath) w := newStore.NewWatchStream() - w.Watch(0, testKey, nil, rev-1) + w.Watch(testKey, nil, rev-1) time.Sleep(delay)