Skip to content

Commit

Permalink
Add VerifyTxConsistency to backend.
Browse files Browse the repository at this point in the history
Signed-off-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
siyuanfoundation committed Feb 2, 2024
1 parent 6d37790 commit 0719a26
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 0 deletions.
1 change: 1 addition & 0 deletions etcdutl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
Expand Down
21 changes: 21 additions & 0 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,7 @@ func (s *EtcdServer) Cleanup() {
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
s.applySnapshot(ep, apply)
s.applyEntries(ep, apply)
s.verifyBackendConsistency(true)

proposalsApplied.Set(float64(ep.appliedi))
s.applyWait.Trigger(ep.appliedi)
Expand Down Expand Up @@ -2251,6 +2252,25 @@ func (s *EtcdServer) monitorStorageVersion() {
}
}

func (s *EtcdServer) verifyBackendConsistency(skipSafeRangeBucket bool) {
verify.Verify(func() {
if s.Backend() == nil {
return
}
s.lg.Debug("verifyBackendConsistency", zap.Bool("skipSafeRangeBucket", skipSafeRangeBucket))
s.Backend().BatchTx().LockOutsideApply()
defer s.Backend().BatchTx().Unlock()
s.Backend().ReadTx().RLock()
defer s.Backend().ReadTx().RUnlock()
for _, bucket := range schema.AllBuckets {
if skipSafeRangeBucket && bucket.IsSafeRangeBucket() {
continue
}
s.Backend().UnsafeVerifyTxConsistency(bucket)
}
})
}

func (s *EtcdServer) monitorKVHash() {
t := s.Cfg.CorruptCheckTime
if t == 0 {
Expand All @@ -2272,6 +2292,7 @@ func (s *EtcdServer) monitorKVHash() {
return
case <-checkTicker.C:
}
s.verifyBackendConsistency(false)
if !s.isLeader() {
continue
}
Expand Down
31 changes: 31 additions & 0 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"time"

humanize "github.com/dustin/go-humanize"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"

bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/client/pkg/v3/verify"
)

var (
Expand Down Expand Up @@ -71,6 +73,9 @@ type Backend interface {

// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
SetTxPostLockInsideApplyHook(func())
// VerifyTxConsistency verifies data in ReadTx and BatchTx are consistent.
VerifyTxConsistency(bucket Bucket)
UnsafeVerifyTxConsistency(bucket Bucket)
}

type Snapshot interface {
Expand Down Expand Up @@ -411,6 +416,32 @@ func (b *backend) SizeInUse() int64 {
return atomic.LoadInt64(&b.sizeInUse)
}

func (b *backend) VerifyTxConsistency(bucket Bucket) {
verify.Verify(func() {
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()
b.readTx.RLock()
defer b.readTx.RUnlock()
b.UnsafeVerifyTxConsistency(bucket)
})
}

func (b *backend) UnsafeVerifyTxConsistency(bucket Bucket) {
kvsFromRead := map[string]string{}
kvsFromBbolt := map[string]string{}
b.batchTx.UnsafeForEach(bucket, func(k, v []byte) error {
kvsFromBbolt[string(k)] = string(v)
return nil
})
b.readTx.UnsafeForEach(bucket, func(k, v []byte) error {
kvsFromRead[string(k)] = string(v)
return nil
})
if diff := cmp.Diff(kvsFromBbolt, kvsFromRead); diff != "" {
panic(fmt.Sprintf("bucket %s kvs mismatch\nbbolt: %v\nreadbuf: %v\ndiff: %s", bucket.String(), kvsFromBbolt, kvsFromRead, diff))
}
}

func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
Expand Down
2 changes: 2 additions & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,8 @@ func (b *fakeBackend) ForceCommit()
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
func (b *fakeBackend) VerifyTxConsistency(bucket backend.Bucket) {}
func (b *fakeBackend) UnsafeVerifyTxConsistency(bucket backend.Bucket) {}

type indexGetResp struct {
rev Revision
Expand Down
2 changes: 2 additions & 0 deletions server/storage/schema/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ var (
AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})

Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false})

AllBuckets = []backend.Bucket{Key, Meta, Lease, Alarm, Cluster, Members, MembersRemoved, Auth, AuthUsers, AuthRoles}
)

type bucket struct {
Expand Down
4 changes: 4 additions & 0 deletions server/storage/schema/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (s *membershipBackend) MustReadMembersFromBackend() (map[types.ID]*membersh
if err != nil {
s.lg.Panic("couldn't read members from backend", zap.Error(err))
}
s.be.VerifyTxConsistency(Members)
s.be.VerifyTxConsistency(MembersRemoved)
return members, removed
}

Expand Down Expand Up @@ -185,6 +187,7 @@ func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
// ClusterVersionFromBackend reads cluster version from backend.
// The field is populated since etcd v3.5.
func (s *membershipBackend) ClusterVersionFromBackend() *semver.Version {
s.be.VerifyTxConsistency(Cluster)
ckey := ClusterClusterVersionKeyName
tx := s.be.ReadTx()
tx.RLock()
Expand All @@ -205,6 +208,7 @@ func (s *membershipBackend) ClusterVersionFromBackend() *semver.Version {
// DowngradeInfoFromBackend reads downgrade info from backend.
// The field is populated since etcd v3.5.
func (s *membershipBackend) DowngradeInfoFromBackend() *version.DowngradeInfo {
s.be.VerifyTxConsistency(Cluster)
dkey := ClusterDowngradeKeyName
tx := s.be.ReadTx()
tx.RLock()
Expand Down

0 comments on commit 0719a26

Please sign in to comment.