From 4b7c1f49ddbf50026da67f9da337b95d766edf23 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 4 Jul 2024 14:34:18 +0800 Subject: [PATCH 1/2] *: update tests for watch API when compact on tombstone revision Signed-off-by: Wei Fu (cherry picked from commit ee33652775842b96d1c0ab601ec3002078998c2a) Signed-off-by: Wei Fu --- tests/e2e/watch_test.go | 192 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 175 insertions(+), 17 deletions(-) diff --git a/tests/e2e/watch_test.go b/tests/e2e/watch_test.go index 34256573932..c2ea60a85cb 100644 --- a/tests/e2e/watch_test.go +++ b/tests/e2e/watch_test.go @@ -25,10 +25,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "go.etcd.io/etcd/clientv3" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/testutil" ) @@ -246,8 +249,7 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr // - COMPACT r5 // - WATCH rev=r5 // -// We should get the DELETE event (r5) followed by the PUT event (r6). However, currently we only -// get the PUT event with returned revision of r6 (key=k, val=v6). +// We should get the DELETE event (r5) followed by the PUT event (r6). func TestDeleteEventDrop_Issue18089(t *testing.T) { cfg := &etcdProcessClusterConfig{ clusterSize: 1, @@ -297,24 +299,180 @@ func TestDeleteEventDrop_Issue18089(t *testing.T) { watchChan := c.Watch(ctx, key, clientv3.WithRev(deleteResp.Header.Revision)) select { case watchResp := <-watchChan: - // TODO(MadhavJivrajani): update conditions once https://github.com/etcd-io/etcd/issues/18089 - // is resolved. The existing conditions do not mimic the desired behaviour and are there to - // test and reproduce etcd-io/etcd#18089. - if len(watchResp.Events) != 1 { - t.Fatalf("expected exactly one event in response, got: %d", len(watchResp.Events)) - } - if watchResp.Events[0].Type != mvccpb.PUT { - t.Fatalf("unexpected event type, expected: %s, got: %s", mvccpb.PUT, watchResp.Events[0].Type) - } - if string(watchResp.Events[0].Kv.Key) != key { - t.Fatalf("unexpected key, expected: %s, got: %s", key, string(watchResp.Events[0].Kv.Key)) - } - if string(watchResp.Events[0].Kv.Value) != v6 { - t.Fatalf("unexpected valye, expected: %s, got: %s", v6, string(watchResp.Events[0].Kv.Value)) - } + require.Len(t, watchResp.Events, 2) + + require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type) + deletedKey := string(watchResp.Events[0].Kv.Key) + require.Equal(t, key, deletedKey) + + require.Equal(t, mvccpb.PUT, watchResp.Events[1].Type) + + updatedKey := string(watchResp.Events[1].Kv.Key) + require.Equal(t, key, updatedKey) + + require.Equal(t, v6, string(watchResp.Events[1].Kv.Value)) case <-time.After(100 * time.Millisecond): // we care only about the first response, but have an // escape hatch in case the watch response is delayed. t.Fatal("timed out getting watch response") } } + +func TestStartWatcherFromCompactedRevision(t *testing.T) { + t.Run("compaction on tombstone revision", func(t *testing.T) { + testStartWatcherFromCompactedRevision(t, true) + }) + t.Run("compaction on normal revision", func(t *testing.T) { + testStartWatcherFromCompactedRevision(t, false) + }) +} + +func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombstone bool) { + cfg := &etcdProcessClusterConfig{ + clusterSize: 1, + isClientAutoTLS: true, + clientTLS: clientTLS, + } + clus, err := newEtcdProcessCluster(t, cfg) + require.NoError(t, err) + defer clus.Close() + + c := newClient(t, clus.EndpointsGRPC(), cfg.clientTLS, cfg.isClientAutoTLS) + defer c.Close() + + ctx := context.Background() + key := "foo" + totalRev := 100 + + type valueEvent struct { + value string + typ mvccpb.Event_EventType + } + + var ( + // requestedValues records all requested change + requestedValues = make([]valueEvent, 0) + // revisionChan sends each compacted revision via this channel + compactionRevChan = make(chan int64) + // compactionStep means that client performs a compaction on every 7 operations + compactionStep = 7 + ) + + // This goroutine will submit changes on $key $totalRev times. It will + // perform compaction after every $compactedAfterChanges changes. + // Except for first time, the watcher always receives the compacted + // revision as start. + go func() { + defer close(compactionRevChan) + + lastRevision := int64(1) + + compactionRevChan <- lastRevision + for vi := 1; vi <= totalRev; vi++ { + var respHeader *etcdserverpb.ResponseHeader + + if vi%compactionStep == 0 && performCompactOnTombstone { + t.Logf("DELETE key=%s", key) + + resp, derr := c.KV.Delete(ctx, key) + require.NoError(t, derr) + respHeader = resp.Header + + requestedValues = append(requestedValues, valueEvent{value: "", typ: mvccpb.DELETE}) + } else { + value := fmt.Sprintf("%d", vi) + + t.Logf("PUT key=%s, val=%s", key, value) + resp, perr := c.KV.Put(ctx, key, value) + require.NoError(t, perr) + respHeader = resp.Header + + requestedValues = append(requestedValues, valueEvent{value: value, typ: mvccpb.PUT}) + } + + lastRevision = respHeader.Revision + + if vi%compactionStep == 0 { + compactionRevChan <- lastRevision + + t.Logf("COMPACT rev=%d", lastRevision) + _, err = c.KV.Compact(ctx, lastRevision, clientv3.WithCompactPhysical()) + require.NoError(t, err) + } + } + }() + + receivedEvents := make([]*clientv3.Event, 0) + + fromCompactedRev := false + for fromRev := range compactionRevChan { + watchChan := c.Watch(ctx, key, clientv3.WithRev(fromRev)) + + prevEventCount := len(receivedEvents) + + // firstReceived represents this is first watch response. + // Just in case that ETCD sends event one by one. + firstReceived := true + + t.Logf("Start to watch key %s starting from revision %d", key, fromRev) + watchLoop: + for { + currentEventCount := len(receivedEvents) + if currentEventCount-prevEventCount == compactionStep || currentEventCount == totalRev { + break + } + + select { + case watchResp := <-watchChan: + t.Logf("Receive the number of events: %d", len(watchResp.Events)) + for i := range watchResp.Events { + ev := watchResp.Events[i] + + // If the $fromRev is the compacted revision, + // the first event should be the same as the last event receives in last watch response. + if firstReceived && fromCompactedRev { + firstReceived = false + + last := receivedEvents[prevEventCount-1] + + assert.Equal(t, last.Type, ev.Type, + "last received event type %s, but got event type %s", last.Type, ev.Type) + assert.Equal(t, string(last.Kv.Key), string(ev.Kv.Key), + "last received event key %s, but got event key %s", string(last.Kv.Key), string(ev.Kv.Key)) + assert.Equal(t, string(last.Kv.Value), string(ev.Kv.Value), + "last received event value %s, but got event value %s", string(last.Kv.Value), string(ev.Kv.Value)) + continue + } + receivedEvents = append(receivedEvents, ev) + } + + if len(watchResp.Events) == 0 { + require.Equal(t, v3rpc.ErrCompacted, watchResp.Err()) + break watchLoop + } + + case <-time.After(10 * time.Second): + t.Fatal("timed out getting watch response") + } + } + + fromCompactedRev = true + } + + t.Logf("Received total number of events: %d", len(receivedEvents)) + require.Len(t, requestedValues, totalRev) + require.Len(t, receivedEvents, totalRev, "should receive %d events", totalRev) + for idx, expected := range requestedValues { + ev := receivedEvents[idx] + + require.Equal(t, expected.typ, ev.Type, "#%d expected event %s", idx, expected.typ) + + updatedKey := string(ev.Kv.Key) + + require.Equal(t, key, updatedKey) + if expected.typ == mvccpb.PUT { + updatedValue := string(ev.Kv.Value) + require.Equal(t, expected.value, updatedValue) + } + } +} From 2828a12ad6463043cd3da41858ab39cf8d26bf95 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 4 Jul 2024 14:35:51 +0800 Subject: [PATCH 2/2] *: keep tombstone if revision == compactAtRev Before this patch, the tombstone can be deleted if its revision is equal compacted revision. It causes that the watch subscriber won't get this DELETE event. Based on Compact API[1], we should keep tombstone revision if it's not less than the compaction revision. > CompactionRequest compacts the key-value store up to a given revision. > All superseded keys with a revision less than the compaction revision > will be removed. [1]: https://etcd.io/docs/latest/dev-guide/api_reference_v3/ Signed-off-by: Wei Fu (cherry picked from commit bbdc94181a6d67904b575ad936c20d1be10e220c) Signed-off-by: Wei Fu --- mvcc/index_test.go | 502 ++++++++++++++++++++++++++++++++++------- mvcc/key_index.go | 20 +- mvcc/key_index_test.go | 60 ++++- mvcc/kvstore.go | 14 ++ 4 files changed, 501 insertions(+), 95 deletions(-) diff --git a/mvcc/index_test.go b/mvcc/index_test.go index ac7a8b182f9..e61d4b3fe8b 100644 --- a/mvcc/index_test.go +++ b/mvcc/index_test.go @@ -18,8 +18,9 @@ import ( "reflect" "testing" - "github.com/google/btree" + "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) func TestIndexGet(t *testing.T) { @@ -290,98 +291,445 @@ func TestIndexRevision(t *testing.T) { func TestIndexCompactAndKeep(t *testing.T) { maxRev := int64(20) - tests := []struct { - key []byte - remove bool - rev revision - created revision - ver int64 + + // key: "foo" + // modified: 10 + // generations: + // {{10, 0}} + // {{1, 0}, {5, 0}, {9, 0}(t)} + // + // key: "foo1" + // modified: 10, 1 + // generations: + // {{10, 1}} + // {{2, 0}, {6, 0}, {7, 0}(t)} + // + // key: "foo2" + // modified: 8 + // generations: + // {empty} + // {{3, 0}, {4, 0}, {8, 0}(t)} + // + buildTreeIndex := func() index { + ti := newTreeIndex(zaptest.NewLogger(t)) + + ti.Put([]byte("foo"), revision{main: 1}) + ti.Put([]byte("foo1"), revision{main: 2}) + ti.Put([]byte("foo2"), revision{main: 3}) + ti.Put([]byte("foo2"), revision{main: 4}) + ti.Put([]byte("foo"), revision{main: 5}) + ti.Put([]byte("foo1"), revision{main: 6}) + require.NoError(t, ti.Tombstone([]byte("foo1"), revision{main: 7})) + require.NoError(t, ti.Tombstone([]byte("foo2"), revision{main: 8})) + require.NoError(t, ti.Tombstone([]byte("foo"), revision{main: 9})) + ti.Put([]byte("foo"), revision{main: 10}) + ti.Put([]byte("foo1"), revision{main: 10, sub: 1}) + return ti + } + + afterCompacts := []struct { + atRev int + keyIndexes []keyIndex + keep map[revision]struct{} + compacted map[revision]struct{} }{ - {[]byte("foo"), false, revision{main: 1}, revision{main: 1}, 1}, - {[]byte("foo1"), false, revision{main: 2}, revision{main: 2}, 1}, - {[]byte("foo2"), false, revision{main: 3}, revision{main: 3}, 1}, - {[]byte("foo2"), false, revision{main: 4}, revision{main: 3}, 2}, - {[]byte("foo"), false, revision{main: 5}, revision{main: 1}, 2}, - {[]byte("foo1"), false, revision{main: 6}, revision{main: 2}, 2}, - {[]byte("foo1"), true, revision{main: 7}, revision{}, 0}, - {[]byte("foo2"), true, revision{main: 8}, revision{}, 0}, - {[]byte("foo"), true, revision{main: 9}, revision{}, 0}, - {[]byte("foo"), false, revision{10, 0}, revision{10, 0}, 1}, - {[]byte("foo1"), false, revision{10, 1}, revision{10, 1}, 1}, + { + atRev: 1, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{{main: 1}, {main: 5}, {main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{{main: 2}, {main: 6}, {main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{{main: 3}, {main: 4}, {main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + {main: 1}: {}, + }, + compacted: map[revision]struct{}{ + {main: 1}: {}, + }, + }, + { + atRev: 2, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{{main: 1}, {main: 5}, {main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{{main: 2}, {main: 6}, {main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{{main: 3}, {main: 4}, {main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + {main: 1}: {}, + {main: 2}: {}, + }, + compacted: map[revision]struct{}{ + {main: 1}: {}, + {main: 2}: {}, + }, + }, + { + atRev: 3, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{{main: 1}, {main: 5}, {main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{{main: 2}, {main: 6}, {main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{{main: 3}, {main: 4}, {main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + {main: 1}: {}, + {main: 2}: {}, + {main: 3}: {}, + }, + compacted: map[revision]struct{}{ + {main: 1}: {}, + {main: 2}: {}, + {main: 3}: {}, + }, + }, + { + atRev: 4, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{{main: 1}, {main: 5}, {main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{{main: 2}, {main: 6}, {main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{{main: 4}, {main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + {main: 1}: {}, + {main: 2}: {}, + {main: 4}: {}, + }, + compacted: map[revision]struct{}{ + {main: 1}: {}, + {main: 2}: {}, + {main: 4}: {}, + }, + }, + { + atRev: 5, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{{main: 5}, {main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{{main: 2}, {main: 6}, {main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{{main: 4}, {main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + {main: 2}: {}, + {main: 4}: {}, + {main: 5}: {}, + }, + compacted: map[revision]struct{}{ + {main: 2}: {}, + {main: 4}: {}, + {main: 5}: {}, + }, + }, + { + atRev: 6, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{{main: 5}, {main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{{main: 6}, {main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{{main: 4}, {main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + {main: 6}: {}, + {main: 4}: {}, + {main: 5}: {}, + }, + compacted: map[revision]struct{}{ + {main: 6}: {}, + {main: 4}: {}, + {main: 5}: {}, + }, + }, + { + atRev: 7, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{{main: 5}, {main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 3, created: revision{main: 2}, revs: []revision{{main: 7}}}, + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{{main: 4}, {main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + {main: 4}: {}, + {main: 5}: {}, + }, + compacted: map[revision]struct{}{ + {main: 7}: {}, + {main: 4}: {}, + {main: 5}: {}, + }, + }, + { + atRev: 8, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{{main: 5}, {main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{{main: 10, sub: 1}}}, + }, + }, + { + key: []byte("foo2"), + modified: revision{main: 8}, + generations: []generation{ + {ver: 3, created: revision{main: 3}, revs: []revision{{main: 8}}}, + {}, + }, + }, + }, + keep: map[revision]struct{}{ + {main: 5}: {}, + }, + compacted: map[revision]struct{}{ + {main: 8}: {}, + {main: 5}: {}, + }, + }, + { + atRev: 9, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 3, created: revision{main: 1}, revs: []revision{{main: 9}}}, + {ver: 1, created: revision{main: 10}, revs: []revision{{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{{main: 10, sub: 1}}}, + }, + }, + }, + keep: map[revision]struct{}{}, + compacted: map[revision]struct{}{ + {main: 9}: {}, + }, + }, + { + atRev: 10, + keyIndexes: []keyIndex{ + { + key: []byte("foo"), + modified: revision{main: 10}, + generations: []generation{ + {ver: 1, created: revision{main: 10}, revs: []revision{{main: 10}}}, + }, + }, + { + key: []byte("foo1"), + modified: revision{main: 10, sub: 1}, + generations: []generation{ + {ver: 1, created: revision{main: 10, sub: 1}, revs: []revision{{main: 10, sub: 1}}}, + }, + }, + }, + keep: map[revision]struct{}{ + {main: 10}: {}, + {main: 10, sub: 1}: {}, + }, + compacted: map[revision]struct{}{ + {main: 10}: {}, + {main: 10, sub: 1}: {}, + }, + }, } + ti := buildTreeIndex() // Continuous Compact and Keep - ti := newTreeIndex(zap.NewExample()) - for _, tt := range tests { - if tt.remove { - ti.Tombstone(tt.key, tt.rev) - } else { - ti.Put(tt.key, tt.rev) - } - } for i := int64(1); i < maxRev; i++ { + j := i - 1 + if i >= int64(len(afterCompacts)) { + j = int64(len(afterCompacts)) - 1 + } + am := ti.Compact(i) + require.Equal(t, afterCompacts[j].compacted, am, "#%d: compact(%d) != expected", i, i) + keep := ti.Keep(i) - if !(reflect.DeepEqual(am, keep)) { - t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep) - } - wti := &treeIndex{tree: btree.New(32)} - for _, tt := range tests { - if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) { - if tt.remove { - wti.Tombstone(tt.key, tt.rev) - } else { - restore(wti, tt.key, tt.created, tt.rev, tt.ver) - } - } - } - if !ti.Equal(wti) { - t.Errorf("#%d: not equal ti", i) + require.Equal(t, afterCompacts[j].keep, keep, "#%d: keep(%d) != expected", i, i) + + nti := newTreeIndex(zaptest.NewLogger(t)).(*treeIndex) + for k := range afterCompacts[j].keyIndexes { + ki := afterCompacts[j].keyIndexes[k] + nti.tree.ReplaceOrInsert(&ki) } + require.True(t, ti.Equal(nti), "#%d: not equal ti", i) } // Once Compact and Keep for i := int64(1); i < maxRev; i++ { - ti := newTreeIndex(zap.NewExample()) - for _, tt := range tests { - if tt.remove { - ti.Tombstone(tt.key, tt.rev) - } else { - ti.Put(tt.key, tt.rev) - } + ti := buildTreeIndex() + + j := i - 1 + if i >= int64(len(afterCompacts)) { + j = int64(len(afterCompacts)) - 1 } + am := ti.Compact(i) + require.Equal(t, afterCompacts[j].compacted, am, "#%d: compact(%d) != expected", i, i) + keep := ti.Keep(i) - if !(reflect.DeepEqual(am, keep)) { - t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep) - } - wti := &treeIndex{tree: btree.New(32)} - for _, tt := range tests { - if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) { - if tt.remove { - wti.Tombstone(tt.key, tt.rev) - } else { - restore(wti, tt.key, tt.created, tt.rev, tt.ver) - } - } - } - if !ti.Equal(wti) { - t.Errorf("#%d: not equal ti", i) - } - } -} + require.Equal(t, afterCompacts[j].keep, keep, "#%d: keep(%d) != expected", i, i) -func restore(ti *treeIndex, key []byte, created, modified revision, ver int64) { - keyi := &keyIndex{key: key} + nti := newTreeIndex(zaptest.NewLogger(t)).(*treeIndex) + for k := range afterCompacts[j].keyIndexes { + ki := afterCompacts[j].keyIndexes[k] + nti.tree.ReplaceOrInsert(&ki) + } - ti.Lock() - defer ti.Unlock() - item := ti.tree.Get(keyi) - if item == nil { - keyi.restore(ti.lg, created, modified, ver) - ti.tree.ReplaceOrInsert(keyi) - return + require.True(t, ti.Equal(nti), "#%d: not equal ti", i) } - okeyi := item.(*keyIndex) - okeyi.put(ti.lg, modified.main, modified.sub) } diff --git a/mvcc/key_index.go b/mvcc/key_index.go index d8dc368c42a..c414fd0c5ae 100644 --- a/mvcc/key_index.go +++ b/mvcc/key_index.go @@ -66,7 +66,8 @@ var ( // compact(5): // generations: // -// {empty} -> key SHOULD be removed. +// {empty} +// {5.0(t)} // // compact(6): // generations: @@ -223,8 +224,7 @@ func (ki *keyIndex) since(lg *zap.Logger, rev int64) []revision { } // compact compacts a keyIndex by removing the versions with smaller or equal -// revision than the given atRev except the largest one (If the largest one is -// a tombstone, it will not be kept). +// revision than the given atRev except the largest one. // If a generation becomes empty during compaction, it will be removed. func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) { if ki.isEmpty() { @@ -246,11 +246,6 @@ func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision] if revIndex != -1 { g.revs = g.revs[revIndex:] } - // remove any tombstone - if len(g.revs) == 1 && genIdx != len(ki.generations)-1 { - delete(available, g.revs[0]) - genIdx++ - } } // remove the previous generations. @@ -266,7 +261,12 @@ func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) { genIdx, revIndex := ki.doCompact(atRev, available) g := &ki.generations[genIdx] if !g.isEmpty() { - // remove any tombstone + // If the given `atRev` is a tombstone, we need to skip it. + // + // Note that this s different from the `compact` function which + // keeps tombstone in such case. We need to stay consistent with + // existing versions, ensuring they always generate the same hash + // values. if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 { delete(available, g.revs[revIndex]) } @@ -287,7 +287,7 @@ func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (gen genIdx, g := 0, &ki.generations[0] // find first generation includes atRev or created after atRev for genIdx < len(ki.generations)-1 { - if tomb := g.revs[len(g.revs)-1].main; tomb > atRev { + if tomb := g.revs[len(g.revs)-1].main; tomb >= atRev { break } genIdx++ diff --git a/mvcc/key_index_test.go b/mvcc/key_index_test.go index f4e70ff3c02..4c38f10e236 100644 --- a/mvcc/key_index_test.go +++ b/mvcc/key_index_test.go @@ -18,6 +18,7 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap" ) @@ -298,12 +299,15 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { key: []byte("foo"), modified: revision{16, 0}, generations: []generation{ + {created: revision{main: 2}, ver: 3, revs: []revision{{main: 6}}}, {created: revision{8, 0}, ver: 3, revs: []revision{{main: 8}, {main: 10}, {main: 12}}}, {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 15, sub: 1}, {main: 16}}}, {}, }, }, - map[revision]struct{}{}, + map[revision]struct{}{ + {main: 6}: {}, + }, }, { 7, @@ -384,11 +388,14 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { key: []byte("foo"), modified: revision{16, 0}, generations: []generation{ + {created: revision{main: 8}, ver: 3, revs: []revision{{main: 12}}}, {created: revision{14, 0}, ver: 3, revs: []revision{{main: 14}, {main: 15, sub: 1}, {main: 16}}}, {}, }, }, - map[revision]struct{}{}, + map[revision]struct{}{ + {main: 12}: {}, + }, }, { 13, @@ -434,7 +441,21 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { 16, &keyIndex{ key: []byte("foo"), - modified: revision{16, 0}, + modified: revision{main: 16}, + generations: []generation{ + {created: revision{main: 14}, ver: 3, revs: []revision{{main: 16}}}, + {}, + }, + }, + map[revision]struct{}{ + {main: 16}: {}, + }, + }, + { + 17, + &keyIndex{ + key: []byte("foo"), + modified: revision{main: 16}, generations: []generation{ {}, }, @@ -443,18 +464,36 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { }, } + isTombstoneRevFn := func(ki *keyIndex, rev int64) bool { + for i := 0; i < len(ki.generations)-1; i++ { + g := ki.generations[i] + + if l := len(g.revs); l > 0 && g.revs[l-1].main == rev { + return true + } + } + return false + } + // Continuous Compaction and finding Keep ki := newTestKeyIndex() for i, tt := range tests { + isTombstone := isTombstoneRevFn(ki, tt.compact) + am := make(map[revision]struct{}) kiclone := cloneKeyIndex(ki) ki.keep(tt.compact, am) if !reflect.DeepEqual(ki, kiclone) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone) } - if !reflect.DeepEqual(am, tt.wam) { - t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) + + if isTombstone { + assert.Equal(t, 0, len(am), "#%d: ki = %d, keep result wants empty because tombstone", i, ki) + } else { + assert.Equal(t, tt.wam, am, + "#%d: ki = %d, compact keep should be equal to keep keep if it's not tombstone", i, ki) } + am = make(map[revision]struct{}) ki.compact(zap.NewExample(), tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { @@ -468,7 +507,7 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { // Jump Compaction and finding Keep ki = newTestKeyIndex() for i, tt := range tests { - if (i%2 == 0 && i < 6) || (i%2 == 1 && i > 6) { + if !isTombstoneRevFn(ki, tt.compact) { am := make(map[revision]struct{}) kiclone := cloneKeyIndex(ki) ki.keep(tt.compact, am) @@ -498,9 +537,14 @@ func TestKeyIndexCompactAndKeep(t *testing.T) { if !reflect.DeepEqual(ki, kiClone) { t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone) } - if !reflect.DeepEqual(am, tt.wam) { - t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam) + + if isTombstoneRevFn(ki, tt.compact) { + assert.Equal(t, 0, len(am), "#%d: ki = %d, keep result wants empty because tombstone", i, ki) + } else { + assert.Equal(t, tt.wam, am, + "#%d: ki = %d, compact keep should be equal to keep keep if it's not tombstone", i, ki) } + am = make(map[revision]struct{}) ki.compact(zap.NewExample(), tt.compact, am) if !reflect.DeepEqual(ki, tt.wki) { diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 098e684f6e8..cc51385ef89 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -228,6 +228,9 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev if !upper.GreaterThan(kr) { return nil } + + isTombstoneRev := isTombstone(k) + // skip revisions that are scheduled for deletion // due to compacting; don't skip if there isn't one. if lower.GreaterThan(kr) && len(keep) > 0 { @@ -235,6 +238,17 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev return nil } } + + // When performing compaction, if the compacted revision is a + // tombstone, older versions (<= 3.5.15 or <= 3.4.33) will delete + // the tombstone. But newer versions (> 3.5.15 or > 3.4.33) won't + // delete it. So we should skip the tombstone in such cases when + // computing the hash to ensure that both older and newer versions + // can always generate the same hash values. + if kr.main == compactRev && isTombstoneRev { + return nil + } + h.Write(k) h.Write(v) return nil