Skip to content

Commit

Permalink
*: refactor consistent index
Browse files Browse the repository at this point in the history
  • Loading branch information
tangcong committed Mar 20, 2020
1 parent 6d1982e commit b976866
Show file tree
Hide file tree
Showing 18 changed files with 315 additions and 274 deletions.
25 changes: 9 additions & 16 deletions auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"go.etcd.io/etcd/auth/authpb"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/etcdserver/cindex"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/backend"

Expand Down Expand Up @@ -91,9 +92,6 @@ type AuthenticateParamIndex struct{}
// AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate()
type AuthenticateParamSimpleTokenPrefix struct{}

// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex
type saveConsistentIndexFunc func(tx backend.BatchTx)

// AuthStore defines auth storage interface.
type AuthStore interface {
// AuthEnable turns on the authentication feature
Expand Down Expand Up @@ -186,9 +184,6 @@ type AuthStore interface {

// HasRole checks that user has role
HasRole(user, role string) bool

// SetConsistentIndexSyncer sets consistentIndex syncer
SetConsistentIndexSyncer(syncer saveConsistentIndexFunc)
}

type TokenProvider interface {
Expand All @@ -212,14 +207,11 @@ type authStore struct {

rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions

tokenProvider TokenProvider
syncConsistentIndex saveConsistentIndexFunc
bcryptCost int // the algorithm cost / strength for hashing auth passwords
tokenProvider TokenProvider
bcryptCost int // the algorithm cost / strength for hashing auth passwords
ci cindex.ConsistentIndexer
}

func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) {
as.syncConsistentIndex = syncer
}
func (as *authStore) AuthEnable() error {
as.enabledMu.Lock()
defer as.enabledMu.Unlock()
Expand Down Expand Up @@ -1018,7 +1010,7 @@ func (as *authStore) IsAuthEnabled() bool {
}

// NewAuthStore creates a new AuthStore.
func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCost int) *authStore {
func NewAuthStore(lg *zap.Logger, be backend.Backend, ci cindex.ConsistentIndexer, tp TokenProvider, bcryptCost int) *authStore {
if lg == nil {
lg = zap.NewNop()
}
Expand Down Expand Up @@ -1053,6 +1045,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
revision: getRevision(tx),
lg: lg,
be: be,
ci: ci,
enabled: enabled,
rangePermCache: make(map[string]*unifiedRangePermissions),
tokenProvider: tp,
Expand Down Expand Up @@ -1314,10 +1307,10 @@ func (as *authStore) BcryptCost() int {
}

func (as *authStore) saveConsistentIndex(tx backend.BatchTx) {
if as.syncConsistentIndex != nil {
as.syncConsistentIndex(tx)
if as.ci != nil {
as.ci.UnsafeSave(tx)
} else {
as.lg.Error("failed to save consistentIndex,syncConsistentIndex is nil")
as.lg.Error("failed to save consistentIndex,consistentIndexer is nil")
}
}

Expand Down
16 changes: 8 additions & 8 deletions auth/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestNewAuthStoreRevision(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
Expand All @@ -63,7 +63,7 @@ func TestNewAuthStoreRevision(t *testing.T) {

// no changes to commit
b2 := backend.NewDefaultBackend(tPath)
as = NewAuthStore(zap.NewExample(), b2, tp, bcrypt.MinCost)
as = NewAuthStore(zap.NewExample(), b2, nil, tp, bcrypt.MinCost)
new := as.Revision()
as.Close()
b2.Close()
Expand All @@ -85,7 +85,7 @@ func TestNewAuthStoreBcryptCost(t *testing.T) {

invalidCosts := [2]int{bcrypt.MinCost - 1, bcrypt.MaxCost + 1}
for _, invalidCost := range invalidCosts {
as := NewAuthStore(zap.NewExample(), b, tp, invalidCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, invalidCost)
if as.BcryptCost() != bcrypt.DefaultCost {
t.Fatalf("expected DefaultCost when bcryptcost is invalid")
}
Expand All @@ -102,7 +102,7 @@ func setupAuthStore(t *testing.T) (store *authStore, teardownfunc func(t *testin
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -703,7 +703,7 @@ func TestAuthInfoFromCtxRace(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
defer as.Close()

donec := make(chan struct{})
Expand Down Expand Up @@ -769,7 +769,7 @@ func TestRecoverFromSnapshot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as2 := NewAuthStore(zap.NewExample(), as.be, tp, bcrypt.MinCost)
as2 := NewAuthStore(zap.NewExample(), as.be, nil, tp, bcrypt.MinCost)
defer func(a *authStore) {
a.Close()
}(as2)
Expand Down Expand Up @@ -851,7 +851,7 @@ func TestRolesOrder(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -906,7 +906,7 @@ func testAuthInfoFromCtxWithRoot(t *testing.T, opts string) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
defer as.Close()

if err = enableAuthAndCreateRoot(as); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion clientv3/snapshot/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package snapshot

import "encoding/binary"
import (
"encoding/binary"
"go.etcd.io/etcd/mvcc/backend"
)

type revision struct {
main int64
Expand All @@ -33,3 +36,4 @@ func bytesToRev(bytes []byte) revision {
type initIndex int

func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) }
func (i *initIndex) Save(backend.BatchTx) {}
5 changes: 3 additions & 2 deletions etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"go.etcd.io/etcd/etcdserver/api/snap"
"go.etcd.io/etcd/etcdserver/cindex"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/mvcc/backend"
Expand Down Expand Up @@ -94,8 +95,8 @@ func openBackend(cfg ServerConfig) backend.Backend {
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
ci := cindex.NewConsistentIndex(oldbe.BatchTx())
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, ci, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil
Expand Down
118 changes: 118 additions & 0 deletions etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cindex

import (
"encoding/binary"
"sync"
"sync/atomic"

"go.etcd.io/etcd/mvcc/backend"
)

var (
metaBucketName = []byte("meta")

consistentIndexKeyName = []byte("consistent_index")
)

// ConsistentIndexer is an interface that wraps the Get/Save method.
// Consistent index is the offset of an entry in a consistent replicated log.
type ConsistentIndexer interface {

// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64

// SetConsistentIndex set the consistent index of current executing entry.
SetConsistentIndex(v uint64)

// UnsafeSave must be called holding the lock on the tx.
// It saves consistentIndex to the underlying stable storage.
UnsafeSave(tx backend.BatchTx)

// SetBatchTx set the available backend.BatchTx for ConsistentIndexer.
SetBatchTx(tx backend.BatchTx)
}

// consistentIndex represents the offset of an entry in a consistent replica log.
// It implements the ConsistentIndexer interface.
// It is always set to the offset of current entry before executing the entry,
// so ConsistentWatchableKV could get the consistent index from it.

type consistentIndex struct {
tx backend.BatchTx
// consistentIndex caches the "consistent_index" key's value. Accessed
// through atomics so must be 64-bit aligned.
consistentIndex uint64
// bytesBuf8 is a byte slice of length 8
// to avoid a repetitive allocation in saveIndex.
bytesBuf8 []byte
mutex sync.Mutex
}

func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer {
return &consistentIndex{tx: tx, bytesBuf8: make([]byte, 8)}
}

func (ci *consistentIndex) ConsistentIndex() uint64 {

if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
return index
}
ci.mutex.Lock()
defer ci.mutex.Unlock()
ci.tx.Lock()
defer ci.tx.Unlock()
_, vs := ci.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0
}
v := binary.BigEndian.Uint64(vs[0])
atomic.StoreUint64(&ci.consistentIndex, v)
return v
}

func (ci *consistentIndex) SetConsistentIndex(v uint64) {
atomic.StoreUint64(&ci.consistentIndex, v)
}

func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
bs := ci.bytesBuf8
binary.BigEndian.PutUint64(bs, ci.consistentIndex)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
}

func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) {
ci.mutex.Lock()
defer ci.mutex.Unlock()
ci.tx = tx
}

func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
return &fakeConsistentIndex{index: index}
}

type fakeConsistentIndex struct{ index uint64 }

func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }

func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
atomic.StoreUint64(&f.index, index)
}

func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {}
15 changes: 3 additions & 12 deletions etcdserver/consistent_index_test.go → etcdserver/cindex/doc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015 The etcd Authors
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,14 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdserver

import "testing"

func TestConsistentIndex(t *testing.T) {
var i consistentIndex
i.setConsistentIndex(10)
if g := i.ConsistentIndex(); g != 10 {
t.Errorf("value = %d, want 10", g)
}
}
// Package cindex provides an interface and implementation for getting/saving consistentIndex.
package cindex
33 changes: 0 additions & 33 deletions etcdserver/consistent_index.go

This file was deleted.

Loading

0 comments on commit b976866

Please sign in to comment.