diff --git a/tests/e2e/hashkv_test.go b/tests/e2e/hashkv_test.go new file mode 100644 index 00000000000..e48fca32395 --- /dev/null +++ b/tests/e2e/hashkv_test.go @@ -0,0 +1,234 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !cluster_proxy + +package e2e + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "go.etcd.io/etcd/client/pkg/v3/fileutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/framework/config" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +func TestVerifyHashKVAfterCompact(t *testing.T) { + scenarios := []struct { + clusterVersion e2e.ClusterVersion + keys []string // used for data generators + }{ + { + clusterVersion: e2e.CurrentVersion, + keys: []string{"key0"}, + }, + { + clusterVersion: e2e.CurrentVersion, + keys: []string{"key0", "key1"}, + }, + { + clusterVersion: e2e.QuorumLastVersion, + keys: []string{"key0"}, + }, + { + clusterVersion: e2e.QuorumLastVersion, + keys: []string{"key0", "key1"}, + }, + } + + for _, compactedOnTombstoneRev := range []bool{false, true} { + for _, scenario := range scenarios { + t.Run(fmt.Sprintf("compactedOnTombstone=%v - %s - Keys=%v", compactedOnTombstoneRev, scenario.clusterVersion, scenario.keys), func(t *testing.T) { + e2e.BeforeTest(t) + + if scenario.clusterVersion != e2e.CurrentVersion { + if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) { + t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease) + } + } + + ctx := context.Background() + + cfg := e2e.NewConfigClientTLS() + clus, err := e2e.NewEtcdProcessCluster(ctx, t, + e2e.WithConfig(cfg), + e2e.WithClusterSize(3), + e2e.WithVersion(scenario.clusterVersion)) + require.NoError(t, err) + + t.Cleanup(func() { clus.Close() }) + + tombstoneRevs, lastestRev := populateDataForHashKV(t, clus, cfg.Client, scenario.keys) + + compactedOnRev := tombstoneRevs[0] + + // If compaction revision isn't a tombstone, select a revision in the middle of two tombstones. + if !compactedOnTombstoneRev { + compactedOnRev = (tombstoneRevs[0] + tombstoneRevs[1]) / 2 + require.Greater(t, tombstoneRevs[1], compactedOnRev) + } + + cli, err := e2e.NewEtcdctl(cfg.Client, clus.EndpointsGRPC()) + require.NoError(t, err) + + t.Logf("COMPACT on rev=%d", compactedOnRev) + _, err = cli.Compact(ctx, compactedOnRev, config.CompactOption{Physical: true}) + require.NoError(t, err) + + for rev := compactedOnRev; rev <= lastestRev; rev++ { + verifyConsistentHashKVAcrossAllMembers(t, cli, rev) + } + }) + } + } +} + +func TestVerifyHashKVAfterTwoCompactionsOnTombstone_MixVersions(t *testing.T) { + e2e.BeforeTest(t) + + if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) { + t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease) + } + + ctx := context.Background() + + cfg := e2e.NewConfigClientTLS() + clus, err := e2e.NewEtcdProcessCluster(ctx, t, + e2e.WithConfig(cfg), + e2e.WithClusterSize(3), + e2e.WithVersion(e2e.QuorumLastVersion)) + require.NoError(t, err) + t.Cleanup(func() { clus.Close() }) + + tombstoneRevs, lastestRev := populateDataForHashKV(t, clus, cfg.Client, []string{"key0"}) + + cli, err := e2e.NewEtcdctl(cfg.Client, clus.EndpointsGRPC()) + require.NoError(t, err) + + firstCompactOnRev := tombstoneRevs[0] + t.Logf("COMPACT rev=%d", firstCompactOnRev) + _, err = cli.Compact(ctx, firstCompactOnRev, config.CompactOption{Physical: true}) + require.NoError(t, err) + + secondCompactOnRev := tombstoneRevs[1] + t.Logf("COMPACT rev=%d", secondCompactOnRev) + _, err = cli.Compact(ctx, secondCompactOnRev, config.CompactOption{Physical: true}) + require.NoError(t, err) + + for rev := secondCompactOnRev; rev <= lastestRev; rev++ { + verifyConsistentHashKVAcrossAllMembers(t, cli, rev) + } +} + +func TestVerifyHashKVAfterCompactionOnLastTombstone_MixVersions(t *testing.T) { + e2e.BeforeTest(t) + + if !fileutil.Exist(e2e.BinPath.EtcdLastRelease) { + t.Skipf("%q does not exist", e2e.BinPath.EtcdLastRelease) + } + + for _, keys := range [][]string{ + []string{"key0"}, + []string{"key0", "key1"}, + } { + t.Run(fmt.Sprintf("#%v", keys), func(t *testing.T) { + ctx := context.Background() + + cfg := e2e.NewConfigClientTLS() + clus, err := e2e.NewEtcdProcessCluster(ctx, t, + e2e.WithConfig(cfg), + e2e.WithClusterSize(3), + e2e.WithVersion(e2e.QuorumLastVersion)) + require.NoError(t, err) + t.Cleanup(func() { clus.Close() }) + + tombstoneRevs, lastestRev := populateDataForHashKV(t, clus, cfg.Client, keys) + + cli, err := e2e.NewEtcdctl(cfg.Client, clus.EndpointsGRPC()) + require.NoError(t, err) + + compactOnRev := tombstoneRevs[len(tombstoneRevs)-1] + t.Logf("COMPACT rev=%d", compactOnRev) + _, err = cli.Compact(ctx, compactOnRev, config.CompactOption{Physical: true}) + require.NoError(t, err) + + for rev := compactOnRev; rev <= lastestRev; rev++ { + verifyConsistentHashKVAcrossAllMembers(t, cli, rev) + } + + }) + } +} + +// populateDataForHashKV populates some sample data, and return a slice of tombstone +// revisions and the latest revision +func populateDataForHashKV(t *testing.T, clus *e2e.EtcdProcessCluster, clientCfg e2e.ClientConfig, keys []string) ([]int64, int64) { + c := newClient(t, clus.EndpointsGRPC(), clientCfg) + defer c.Close() + + ctx := context.Background() + totalOperations := 40 + + var ( + tombStoneRevs []int64 + latestRev int64 + ) + + deleteStep := 10 // submit a delete operation on every 10 operations + for i := 1; i <= totalOperations; i++ { + if i%deleteStep == 0 { + t.Logf("Deleting key=%s", keys[0]) // Only delete the first key for simplicity + resp, derr := c.Delete(ctx, keys[0]) + require.NoError(t, derr) + latestRev = resp.Header.Revision + tombStoneRevs = append(tombStoneRevs, resp.Header.Revision) + continue + } + + value := fmt.Sprintf("%d", i) + var ops []clientv3.Op + for _, key := range keys { + ops = append(ops, clientv3.OpPut(key, value)) + } + + t.Logf("Writing keys: %v, value: %s", keys, value) + resp, terr := c.Txn(ctx).Then(ops...).Commit() + require.NoError(t, terr) + require.True(t, resp.Succeeded) + require.Len(t, resp.Responses, len(ops)) + latestRev = resp.Header.Revision + } + return tombStoneRevs, latestRev +} + +func verifyConsistentHashKVAcrossAllMembers(t *testing.T, cli *e2e.EtcdctlV3, hashKVOnRev int64) { + ctx := context.Background() + + t.Logf("HashKV on rev=%d", hashKVOnRev) + resp, err := cli.HashKV(ctx, hashKVOnRev) + require.NoError(t, err) + + require.Greater(t, len(resp), 1) + require.True(t, resp[0].Hash != 0) + t.Logf("One Hash value is %d", resp[0].Hash) + + for i := 1; i < len(resp); i++ { + require.Equal(t, resp[0].Hash, resp[i].Hash) + } +}