From a89b24041290bbfb047e80ff8f3c067a518f95f1 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Wed, 3 Jul 2024 14:23:38 +0800 Subject: [PATCH] *: keep tombstone if revision == compactAtRev Before this patch, the tombstone can be deleted if its revision is equal compacted revision. It causes that the watch subscriber won't get this DELETE event. Based on Compact API[1], we should keep tombstone revision if it's not less than the compaction revision. > CompactionRequest compacts the key-value store up to a given revision. > All superseded keys with a revision less than the compaction revision > will be removed. [1]: https://etcd.io/docs/latest/dev-guide/api_reference_v3/ Signed-off-by: Wei Fu --- server/storage/mvcc/index_test.go | 456 +++++++++++++++++++++----- server/storage/mvcc/key_index.go | 33 +- server/storage/mvcc/key_index_test.go | 28 +- tests/e2e/watch_test.go | 27 +- 4 files changed, 438 insertions(+), 106 deletions(-) diff --git a/server/storage/mvcc/index_test.go b/server/storage/mvcc/index_test.go index 7ac27c9608ed..9cb5a90adb14 100644 --- a/server/storage/mvcc/index_test.go +++ b/server/storage/mvcc/index_test.go @@ -18,7 +18,7 @@ import ( "reflect" "testing" - "github.com/google/btree" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -235,101 +235,409 @@ func TestIndexRevision(t *testing.T) { func TestIndexCompactAndKeep(t *testing.T) { maxRev := int64(20) - tests := []struct { - key []byte - remove bool - rev Revision - created Revision - ver int64 - }{ - {[]byte("foo"), false, Revision{Main: 1}, Revision{Main: 1}, 1}, - {[]byte("foo1"), false, Revision{Main: 2}, Revision{Main: 2}, 1}, - {[]byte("foo2"), false, Revision{Main: 3}, Revision{Main: 3}, 1}, - {[]byte("foo2"), false, Revision{Main: 4}, Revision{Main: 3}, 2}, - {[]byte("foo"), false, Revision{Main: 5}, Revision{Main: 1}, 2}, - {[]byte("foo1"), false, Revision{Main: 6}, Revision{Main: 2}, 2}, - {[]byte("foo1"), true, Revision{Main: 7}, Revision{}, 0}, - {[]byte("foo2"), true, Revision{Main: 8}, Revision{}, 0}, - {[]byte("foo"), true, Revision{Main: 9}, Revision{}, 0}, - {[]byte("foo"), false, Revision{Main: 10}, Revision{Main: 10}, 1}, - {[]byte("foo1"), false, Revision{Main: 10, Sub: 1}, Revision{Main: 10, Sub: 1}, 1}, + + // FIXME(fuweid): For key foo, compact([2,3,4]) won't delete revision(1, 0). + // + // key: "foo" + // modified: 10 + // generations: + // {{10, 0}} + // {{1, 0}, {5, 0}, {9, 0}(t)} + // + // key: "foo1" + // modified: 10, 1 + // generations: + // {{10, 1}} + // {{2, 0}, {6, 0}, {7, 0}(t)} + // + // key: "foo2" + // modified: 8 + // generations: + // {empty} + // {{3, 0}, {4, 0}, {8, 0}(t)} + // + buildTreeIndex := func() index { + ti := newTreeIndex(zaptest.NewLogger(t)) + + ti.Put([]byte("foo"), Revision{Main: 1}) + ti.Put([]byte("foo1"), Revision{Main: 2}) + ti.Put([]byte("foo2"), Revision{Main: 3}) + ti.Put([]byte("foo2"), Revision{Main: 4}) + ti.Put([]byte("foo"), Revision{Main: 5}) + ti.Put([]byte("foo1"), Revision{Main: 6}) + require.NoError(t, ti.Tombstone([]byte("foo1"), Revision{Main: 7})) + require.NoError(t, ti.Tombstone([]byte("foo2"), Revision{Main: 8})) + require.NoError(t, ti.Tombstone([]byte("foo"), Revision{Main: 9})) + ti.Put([]byte("foo"), Revision{Main: 10}) + ti.Put([]byte("foo1"), Revision{Main: 10, Sub: 1}) + return ti } - // Continuous Compact and Keep - ti := newTreeIndex(zaptest.NewLogger(t)) - for _, tt := range tests { - if tt.remove { - ti.Tombstone(tt.key, tt.rev) - } else { - ti.Put(tt.key, tt.rev) - } + afterCompacts := []struct { + atRev int + keyIndexes []keyIndex + keep map[Revision]struct{} + }{ + { + atRev: 1, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: Revision{Main: 10}, + generations: []generation{ + {ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 1}, Revision{Main: 5}, Revision{Main: 9}}}, + {ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: Revision{Main: 10, Sub: 1}, + generations: []generation{ + {ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 2}, Revision{Main: 6}, Revision{Main: 7}}}, + {ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: Revision{Main: 8}, + generations: []generation{ + {ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 3}, Revision{Main: 4}, Revision{Main: 8}}}, + {}, + }, + }, + }, + keep: map[Revision]struct{}{ + Revision{Main: 1}: {}, + }, + }, + { + atRev: 2, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: Revision{Main: 10}, + generations: []generation{ + {ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 1}, Revision{Main: 5}, Revision{Main: 9}}}, + {ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: Revision{Main: 10, Sub: 1}, + generations: []generation{ + {ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 2}, Revision{Main: 6}, Revision{Main: 7}}}, + {ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: Revision{Main: 8}, + generations: []generation{ + {ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 3}, Revision{Main: 4}, Revision{Main: 8}}}, + {}, + }, + }, + }, + keep: map[Revision]struct{}{ + Revision{Main: 1}: {}, + Revision{Main: 2}: {}, + }, + }, + { + atRev: 3, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: Revision{Main: 10}, + generations: []generation{ + {ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 1}, Revision{Main: 5}, Revision{Main: 9}}}, + {ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: Revision{Main: 10, Sub: 1}, + generations: []generation{ + {ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 2}, Revision{Main: 6}, Revision{Main: 7}}}, + {ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: Revision{Main: 8}, + generations: []generation{ + {ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 3}, Revision{Main: 4}, Revision{Main: 8}}}, + {}, + }, + }, + }, + keep: map[Revision]struct{}{ + Revision{Main: 1}: {}, + Revision{Main: 2}: {}, + Revision{Main: 3}: {}, + }, + }, + { + atRev: 4, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: Revision{Main: 10}, + generations: []generation{ + {ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 1}, Revision{Main: 5}, Revision{Main: 9}}}, + {ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: Revision{Main: 10, Sub: 1}, + generations: []generation{ + {ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 2}, Revision{Main: 6}, Revision{Main: 7}}}, + {ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: Revision{Main: 8}, + generations: []generation{ + {ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 4}, Revision{Main: 8}}}, + {}, + }, + }, + }, + keep: map[Revision]struct{}{ + Revision{Main: 1}: {}, + Revision{Main: 2}: {}, + Revision{Main: 4}: {}, + }, + }, + { + atRev: 5, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: Revision{Main: 10}, + generations: []generation{ + {ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 5}, Revision{Main: 9}}}, + {ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: Revision{Main: 10, Sub: 1}, + generations: []generation{ + {ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 2}, Revision{Main: 6}, Revision{Main: 7}}}, + {ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: Revision{Main: 8}, + generations: []generation{ + {ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 4}, Revision{Main: 8}}}, + {}, + }, + }, + }, + keep: map[Revision]struct{}{ + Revision{Main: 2}: {}, + Revision{Main: 4}: {}, + Revision{Main: 5}: {}, + }, + }, + { + atRev: 6, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: Revision{Main: 10}, + generations: []generation{ + {ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 5}, Revision{Main: 9}}}, + {ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: Revision{Main: 10, Sub: 1}, + generations: []generation{ + {ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 6}, Revision{Main: 7}}}, + {ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: Revision{Main: 8}, + generations: []generation{ + {ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 4}, Revision{Main: 8}}}, + {}, + }, + }, + }, + keep: map[Revision]struct{}{ + Revision{Main: 6}: {}, + Revision{Main: 4}: {}, + Revision{Main: 5}: {}, + }, + }, + { + atRev: 7, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: Revision{Main: 10}, + generations: []generation{ + {ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 5}, Revision{Main: 9}}}, + {ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: Revision{Main: 10, Sub: 1}, + generations: []generation{ + {ver: 3, created: Revision{Main: 2}, revs: []Revision{Revision{Main: 7}}}, + {ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: Revision{Main: 8}, + generations: []generation{ + {ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 4}, Revision{Main: 8}}}, + {}, + }, + }, + }, + keep: map[Revision]struct{}{ + Revision{Main: 7}: {}, + Revision{Main: 4}: {}, + Revision{Main: 5}: {}, + }, + }, + { + atRev: 8, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: Revision{Main: 10}, + generations: []generation{ + {ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 5}, Revision{Main: 9}}}, + {ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: Revision{Main: 10, Sub: 1}, + generations: []generation{ + {ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: Revision{Main: 8}, + generations: []generation{ + {ver: 3, created: Revision{Main: 3}, revs: []Revision{Revision{Main: 8}}}, + {}, + }, + }, + }, + keep: map[Revision]struct{}{ + Revision{Main: 8}: {}, + Revision{Main: 5}: {}, + }, + }, + { + atRev: 9, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: Revision{Main: 10}, + generations: []generation{ + {ver: 3, created: Revision{Main: 1}, revs: []Revision{Revision{Main: 9}}}, + {ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: Revision{Main: 10, Sub: 1}, + generations: []generation{ + {ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}}, + }, + }, + }, + keep: map[Revision]struct{}{ + Revision{Main: 9}: {}, + }, + }, + { + atRev: 10, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: Revision{Main: 10}, + generations: []generation{ + {ver: 1, created: Revision{Main: 10}, revs: []Revision{Revision{Main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: Revision{Main: 10, Sub: 1}, + generations: []generation{ + {ver: 1, created: Revision{Main: 10, Sub: 1}, revs: []Revision{Revision{Main: 10, Sub: 1}}}, + }, + }, + }, + keep: map[Revision]struct{}{ + Revision{Main: 10}: {}, + Revision{Main: 10, Sub: 1}: {}, + }, + }, } + + ti := buildTreeIndex() for i := int64(1); i < maxRev; i++ { am := ti.Compact(i) keep := ti.Keep(i) if !(reflect.DeepEqual(am, keep)) { t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep) } - wti := &treeIndex{tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool { - return aki.Less(bki) - })} - for _, tt := range tests { - if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(Revision{Main: i}) { - if tt.remove { - wti.Tombstone(tt.key, tt.rev) - } else { - restore(wti, tt.key, tt.created, tt.rev, tt.ver) - } - } + + j := i - 1 + if i >= int64(len(afterCompacts)) { + j = int64(len(afterCompacts)) - 1 } - if !ti.Equal(wti) { - t.Errorf("#%d: not equal ti", i) + require.Equal(t, afterCompacts[j].keep, keep, "#%d: compact(%d) keep != expected", i, i) + + nti := newTreeIndex(zaptest.NewLogger(t)).(*treeIndex) + for k := range afterCompacts[j].keyIndexes { + ki := afterCompacts[j].keyIndexes[k] + nti.tree.ReplaceOrInsert(&ki) } + + require.True(t, ti.Equal(nti), "#%d: not equal ti", i) } // Once Compact and Keep for i := int64(1); i < maxRev; i++ { - ti := newTreeIndex(zaptest.NewLogger(t)) - for _, tt := range tests { - if tt.remove { - ti.Tombstone(tt.key, tt.rev) - } else { - ti.Put(tt.key, tt.rev) - } - } + ti := buildTreeIndex() + am := ti.Compact(i) keep := ti.Keep(i) if !(reflect.DeepEqual(am, keep)) { t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep) } - wti := &treeIndex{tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool { - return aki.Less(bki) - })} - for _, tt := range tests { - if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(Revision{Main: i}) { - if tt.remove { - wti.Tombstone(tt.key, tt.rev) - } else { - restore(wti, tt.key, tt.created, tt.rev, tt.ver) - } - } - } - if !ti.Equal(wti) { - t.Errorf("#%d: not equal ti", i) + + j := i - 1 + if i >= int64(len(afterCompacts)) { + j = int64(len(afterCompacts)) - 1 } - } -} + require.Equal(t, afterCompacts[j].keep, keep, "#%d: compact(%d) keep != expected", i, i) -func restore(ti *treeIndex, key []byte, created, modified Revision, ver int64) { - keyi := &keyIndex{key: key} + nti := newTreeIndex(zaptest.NewLogger(t)).(*treeIndex) + for k := range afterCompacts[j].keyIndexes { + ki := afterCompacts[j].keyIndexes[k] + nti.tree.ReplaceOrInsert(&ki) + } - ti.Lock() - defer ti.Unlock() - okeyi, _ := ti.tree.Get(keyi) - if okeyi == nil { - keyi.restore(ti.lg, created, modified, ver) - ti.tree.ReplaceOrInsert(keyi) - return + require.True(t, ti.Equal(nti), "#%d: not equal ti", i) } - okeyi.put(ti.lg, modified.Main, modified.Sub) } diff --git a/server/storage/mvcc/key_index.go b/server/storage/mvcc/key_index.go index 07ad930c18a1..97c753e8c377 100644 --- a/server/storage/mvcc/key_index.go +++ b/server/storage/mvcc/key_index.go @@ -65,7 +65,8 @@ var ( // compact(5): // generations: // -// {empty} -> key SHOULD be removed. +// {empty} +// {5.0(t)} // // compact(6): // generations: @@ -205,6 +206,18 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []Revision { // revision than the given atRev except the largest one (If the largest one is // a tombstone, it will not be kept). // If a generation becomes empty during compaction, it will be removed. +// +// FIXME(fuweid): There is an exception that compact function won't remove +// smaller version. For example, the compact(2) on key "foo" won't delete +// revision{1,0}. +// +// key: "foo" +// modified: 11 +// generations: +// +// {empty} +// {{10,0}, {11,0}(t)} +// {{1,0}, {3,0}, {5,0}(t)} func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[Revision]struct{}) { if ki.isEmpty() { lg.Panic( @@ -221,11 +234,6 @@ func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[Revision] if revIndex != -1 { g.revs = g.revs[revIndex:] } - // remove any tombstone - if len(g.revs) == 1 && genIdx != len(ki.generations)-1 { - delete(available, g.revs[0]) - genIdx++ - } } // remove the previous generations. @@ -238,20 +246,15 @@ func (ki *keyIndex) keep(atRev int64, available map[Revision]struct{}) { return } - genIdx, revIndex := ki.doCompact(atRev, available) - g := &ki.generations[genIdx] - if !g.isEmpty() { - // remove any tombstone - if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 { - delete(available, g.revs[revIndex]) - } - } + ki.doCompact(atRev, available) } func (ki *keyIndex) doCompact(atRev int64, available map[Revision]struct{}) (genIdx int, revIndex int) { // walk until reaching the first revision smaller or equal to "atRev", // and add the revision to the available map f := func(rev Revision) bool { + // NOTE: Duplicate keys in one txn isn't allowed. So, there is + // no need to check Sub revision here. if rev.Main <= atRev { available[rev] = struct{}{} return false @@ -262,7 +265,7 @@ func (ki *keyIndex) doCompact(atRev int64, available map[Revision]struct{}) (gen genIdx, g := 0, &ki.generations[0] // find first generation includes atRev or created after atRev for genIdx < len(ki.generations)-1 { - if tomb := g.revs[len(g.revs)-1].Main; tomb > atRev { + if tomb := g.revs[len(g.revs)-1].Main; tomb >= atRev { break } genIdx++ diff --git a/server/storage/mvcc/key_index_test.go b/server/storage/mvcc/key_index_test.go index d607ec091098..4f323a58e455 100644 --- a/server/storage/mvcc/key_index_test.go +++ b/server/storage/mvcc/key_index_test.go @@ -307,12 +307,15 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { key: []byte("foo"), modified: Revision{Main: 16}, generations: []generation{ + {created: Revision{Main: 2}, ver: 3, revs: []Revision{Revision{Main: 6}}}, {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 8}, Revision{Main: 10}, Revision{Main: 12}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}}, {}, }, }, - map[Revision]struct{}{}, + map[Revision]struct{}{ + Revision{Main: 6}: {}, + }, }, { 7, @@ -393,11 +396,14 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { key: []byte("foo"), modified: Revision{Main: 16}, generations: []generation{ + {created: Revision{Main: 8}, ver: 3, revs: []Revision{Revision{Main: 12}}}, {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 14}, Revision{Main: 14, Sub: 1}, Revision{Main: 16}}}, {}, }, }, - map[Revision]struct{}{}, + map[Revision]struct{}{ + Revision{Main: 12}: {}, + }, }, { 13, @@ -441,6 +447,20 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { }, { 16, + &keyIndex{ + key: []byte("foo"), + modified: Revision{Main: 16}, + generations: []generation{ + {created: Revision{Main: 14}, ver: 3, revs: []Revision{Revision{Main: 16}}}, + {}, + }, + }, + map[Revision]struct{}{ + Revision{Main: 16}: {}, + }, + }, + { + 17, &keyIndex{ key: []byte("foo"), modified: Revision{Main: 16}, @@ -687,6 +707,10 @@ func TestGenerationWalk(t *testing.T) { } } +// TODO(fuweid): +// +// Replace {14, 1} with {15, 1} because ETCD server doesn't allow duplicate +// keys in one txn. func newTestKeyIndex(lg *zap.Logger) *keyIndex { // key: "foo" // modified: 16 diff --git a/tests/e2e/watch_test.go b/tests/e2e/watch_test.go index c69fcac1a798..f02021fc3aa2 100644 --- a/tests/e2e/watch_test.go +++ b/tests/e2e/watch_test.go @@ -311,21 +311,18 @@ func TestDeleteEventDrop_Issue18089(t *testing.T) { watchChan := c.Watch(ctx, key, clientv3.WithRev(deleteResp.Header.Revision)) select { case watchResp := <-watchChan: - // TODO(MadhavJivrajani): update conditions once https://github.com/etcd-io/etcd/issues/18089 - // is resolved. The existing conditions do not mimic the desired behaviour and are there to - // test and reproduce etcd-io/etcd#18089. - if len(watchResp.Events) != 1 { - t.Fatalf("expected exactly one event in response, got: %d", len(watchResp.Events)) - } - if watchResp.Events[0].Type != mvccpb.PUT { - t.Fatalf("unexpected event type, expected: %s, got: %s", mvccpb.PUT, watchResp.Events[0].Type) - } - if string(watchResp.Events[0].Kv.Key) != key { - t.Fatalf("unexpected key, expected: %s, got: %s", key, string(watchResp.Events[0].Kv.Key)) - } - if string(watchResp.Events[0].Kv.Value) != v6 { - t.Fatalf("unexpected valye, expected: %s, got: %s", v6, string(watchResp.Events[0].Kv.Value)) - } + require.Len(t, watchResp.Events, 2) + + require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type) + deletedKey := string(watchResp.Events[0].Kv.Key) + require.Equal(t, key, deletedKey) + + require.Equal(t, mvccpb.PUT, watchResp.Events[1].Type) + + updatedKey := string(watchResp.Events[1].Kv.Key) + require.Equal(t, key, updatedKey) + + require.Equal(t, v6, string(watchResp.Events[1].Kv.Value)) case <-time.After(100 * time.Millisecond): // we care only about the first response, but have an // escape hatch in case the watch response is delayed.