From 0719a262e508581aba5315fc7a1ea3ced86de5e1 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Wed, 31 Jan 2024 17:20:04 -0800 Subject: [PATCH] Add VerifyTxConsistency to backend. Signed-off-by: Siyuan Zhang --- etcdutl/go.mod | 1 + server/etcdserver/server.go | 21 +++++++++++++++++++ server/storage/backend/backend.go | 31 +++++++++++++++++++++++++++++ server/storage/mvcc/kvstore_test.go | 2 ++ server/storage/schema/bucket.go | 2 ++ server/storage/schema/membership.go | 4 ++++ 6 files changed, 61 insertions(+) diff --git a/etcdutl/go.mod b/etcdutl/go.mod index 72bb0587b5d5..f46e39fcbc04 100644 --- a/etcdutl/go.mod +++ b/etcdutl/go.mod @@ -44,6 +44,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/btree v1.1.2 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jonboulle/clockwork v0.4.0 // indirect diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a1b51a171c2e..bba6c2c31e37 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -946,6 +946,7 @@ func (s *EtcdServer) Cleanup() { func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) { s.applySnapshot(ep, apply) s.applyEntries(ep, apply) + s.verifyBackendConsistency(true) proposalsApplied.Set(float64(ep.appliedi)) s.applyWait.Trigger(ep.appliedi) @@ -2251,6 +2252,25 @@ func (s *EtcdServer) monitorStorageVersion() { } } +func (s *EtcdServer) verifyBackendConsistency(skipSafeRangeBucket bool) { + verify.Verify(func() { + if s.Backend() == nil { + return + } + s.lg.Debug("verifyBackendConsistency", zap.Bool("skipSafeRangeBucket", skipSafeRangeBucket)) + s.Backend().BatchTx().LockOutsideApply() + defer s.Backend().BatchTx().Unlock() + s.Backend().ReadTx().RLock() + defer s.Backend().ReadTx().RUnlock() + for _, bucket := range schema.AllBuckets { + if skipSafeRangeBucket && bucket.IsSafeRangeBucket() { + continue + } + s.Backend().UnsafeVerifyTxConsistency(bucket) + } + }) +} + func (s *EtcdServer) monitorKVHash() { t := s.Cfg.CorruptCheckTime if t == 0 { @@ -2272,6 +2292,7 @@ func (s *EtcdServer) monitorKVHash() { return case <-checkTicker.C: } + s.verifyBackendConsistency(false) if !s.isLeader() { continue } diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index 7aa4f846987f..4be03137ad37 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -25,9 +25,11 @@ import ( "time" humanize "github.com/dustin/go-humanize" + "github.com/google/go-cmp/cmp" "go.uber.org/zap" bolt "go.etcd.io/bbolt" + "go.etcd.io/etcd/client/pkg/v3/verify" ) var ( @@ -71,6 +73,9 @@ type Backend interface { // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook. SetTxPostLockInsideApplyHook(func()) + // VerifyTxConsistency verifies data in ReadTx and BatchTx are consistent. + VerifyTxConsistency(bucket Bucket) + UnsafeVerifyTxConsistency(bucket Bucket) } type Snapshot interface { @@ -411,6 +416,32 @@ func (b *backend) SizeInUse() int64 { return atomic.LoadInt64(&b.sizeInUse) } +func (b *backend) VerifyTxConsistency(bucket Bucket) { + verify.Verify(func() { + b.batchTx.LockOutsideApply() + defer b.batchTx.Unlock() + b.readTx.RLock() + defer b.readTx.RUnlock() + b.UnsafeVerifyTxConsistency(bucket) + }) +} + +func (b *backend) UnsafeVerifyTxConsistency(bucket Bucket) { + kvsFromRead := map[string]string{} + kvsFromBbolt := map[string]string{} + b.batchTx.UnsafeForEach(bucket, func(k, v []byte) error { + kvsFromBbolt[string(k)] = string(v) + return nil + }) + b.readTx.UnsafeForEach(bucket, func(k, v []byte) error { + kvsFromRead[string(k)] = string(v) + return nil + }) + if diff := cmp.Diff(kvsFromBbolt, kvsFromRead); diff != "" { + panic(fmt.Sprintf("bucket %s kvs mismatch\nbbolt: %v\nreadbuf: %v\ndiff: %s", bucket.String(), kvsFromBbolt, kvsFromRead, diff)) + } +} + func (b *backend) run() { defer close(b.donec) t := time.NewTimer(b.batchInterval) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 58e23249dc9a..6829f7439512 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -997,6 +997,8 @@ func (b *fakeBackend) ForceCommit() func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {} +func (b *fakeBackend) VerifyTxConsistency(bucket backend.Bucket) {} +func (b *fakeBackend) UnsafeVerifyTxConsistency(bucket backend.Bucket) {} type indexGetResp struct { rev Revision diff --git a/server/storage/schema/bucket.go b/server/storage/schema/bucket.go index 5472af3c3b47..06da660df5ee 100644 --- a/server/storage/schema/bucket.go +++ b/server/storage/schema/bucket.go @@ -54,6 +54,8 @@ var ( AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false}) Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false}) + + AllBuckets = []backend.Bucket{Key, Meta, Lease, Alarm, Cluster, Members, MembersRemoved, Auth, AuthUsers, AuthRoles} ) type bucket struct { diff --git a/server/storage/schema/membership.go b/server/storage/schema/membership.go index 44e2af1cd7f7..2296ae6b4d3b 100644 --- a/server/storage/schema/membership.go +++ b/server/storage/schema/membership.go @@ -82,6 +82,8 @@ func (s *membershipBackend) MustReadMembersFromBackend() (map[types.ID]*membersh if err != nil { s.lg.Panic("couldn't read members from backend", zap.Error(err)) } + s.be.VerifyTxConsistency(Members) + s.be.VerifyTxConsistency(MembersRemoved) return members, removed } @@ -185,6 +187,7 @@ func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID { // ClusterVersionFromBackend reads cluster version from backend. // The field is populated since etcd v3.5. func (s *membershipBackend) ClusterVersionFromBackend() *semver.Version { + s.be.VerifyTxConsistency(Cluster) ckey := ClusterClusterVersionKeyName tx := s.be.ReadTx() tx.RLock() @@ -205,6 +208,7 @@ func (s *membershipBackend) ClusterVersionFromBackend() *semver.Version { // DowngradeInfoFromBackend reads downgrade info from backend. // The field is populated since etcd v3.5. func (s *membershipBackend) DowngradeInfoFromBackend() *version.DowngradeInfo { + s.be.VerifyTxConsistency(Cluster) dkey := ClusterDowngradeKeyName tx := s.be.ReadTx() tx.RLock()