Skip to content

Commit 06ad533

Browse files
committedFeb 29, 2020
*: fix auth revision corruption bug
1 parent 085b19b commit 06ad533

11 files changed

+141
-45
lines changed
 

‎auth/store.go

+41-2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ type AuthenticateParamIndex struct{}
9191
// AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate()
9292
type AuthenticateParamSimpleTokenPrefix struct{}
9393

94+
// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex
95+
type saveConsistentIndexFunc func(tx backend.BatchTx)
96+
9497
// AuthStore defines auth storage interface.
9598
type AuthStore interface {
9699
// AuthEnable turns on the authentication feature
@@ -183,6 +186,9 @@ type AuthStore interface {
183186

184187
// HasRole checks that user has role
185188
HasRole(user, role string) bool
189+
190+
// SetConsistentIndexSyncer sets consistentIndex syncer
191+
SetConsistentIndexSyncer(syncer saveConsistentIndexFunc)
186192
}
187193

188194
type TokenProvider interface {
@@ -206,10 +212,14 @@ type authStore struct {
206212

207213
rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions
208214

209-
tokenProvider TokenProvider
210-
bcryptCost int // the algorithm cost / strength for hashing auth passwords
215+
tokenProvider TokenProvider
216+
syncConsistentIndex saveConsistentIndexFunc
217+
bcryptCost int // the algorithm cost / strength for hashing auth passwords
211218
}
212219

220+
func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) {
221+
as.syncConsistentIndex = syncer
222+
}
213223
func (as *authStore) AuthEnable() error {
214224
as.enabledMu.Lock()
215225
defer as.enabledMu.Unlock()
@@ -258,6 +268,7 @@ func (as *authStore) AuthDisable() {
258268
tx.Lock()
259269
tx.UnsafePut(authBucketName, enableFlagKey, authDisabled)
260270
as.commitRevision(tx)
271+
as.saveConsistentIndex(tx)
261272
tx.Unlock()
262273
b.ForceCommit()
263274

@@ -403,6 +414,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
403414
putUser(as.lg, tx, newUser)
404415

405416
as.commitRevision(tx)
417+
as.saveConsistentIndex(tx)
406418

407419
as.lg.Info("added a user", zap.String("user-name", r.Name))
408420
return &pb.AuthUserAddResponse{}, nil
@@ -426,6 +438,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
426438
delUser(tx, r.Name)
427439

428440
as.commitRevision(tx)
441+
as.saveConsistentIndex(tx)
429442

430443
as.invalidateCachedPerm(r.Name)
431444
as.tokenProvider.invalidateUser(r.Name)
@@ -470,6 +483,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
470483
putUser(as.lg, tx, updatedUser)
471484

472485
as.commitRevision(tx)
486+
as.saveConsistentIndex(tx)
473487

474488
as.invalidateCachedPerm(r.Name)
475489
as.tokenProvider.invalidateUser(r.Name)
@@ -518,6 +532,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
518532
as.invalidateCachedPerm(r.User)
519533

520534
as.commitRevision(tx)
535+
as.saveConsistentIndex(tx)
521536

522537
as.lg.Info(
523538
"granted a role to a user",
@@ -596,6 +611,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
596611
as.invalidateCachedPerm(r.Name)
597612

598613
as.commitRevision(tx)
614+
as.saveConsistentIndex(tx)
599615

600616
as.lg.Info(
601617
"revoked a role from a user",
@@ -666,6 +682,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
666682
as.clearCachedPerm()
667683

668684
as.commitRevision(tx)
685+
as.saveConsistentIndex(tx)
669686

670687
as.lg.Info(
671688
"revoked a permission on range",
@@ -717,6 +734,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
717734
}
718735

719736
as.commitRevision(tx)
737+
as.saveConsistentIndex(tx)
720738

721739
as.lg.Info("deleted a role", zap.String("role-name", r.Role))
722740
return &pb.AuthRoleDeleteResponse{}, nil
@@ -743,6 +761,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
743761
putRole(as.lg, tx, newRole)
744762

745763
as.commitRevision(tx)
764+
as.saveConsistentIndex(tx)
746765

747766
as.lg.Info("created a role", zap.String("role-name", r.Name))
748767
return &pb.AuthRoleAddResponse{}, nil
@@ -781,6 +800,16 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
781800
})
782801

783802
if idx < len(role.KeyPermission) && bytes.Equal(role.KeyPermission[idx].Key, r.Perm.Key) && bytes.Equal(role.KeyPermission[idx].RangeEnd, r.Perm.RangeEnd) {
803+
if role.KeyPermission[idx].PermType == r.Perm.PermType {
804+
as.lg.Warn(
805+
"ignored grant permission request to a role, existing permission",
806+
zap.String("role-name", r.Name),
807+
zap.ByteString("key", r.Perm.Key),
808+
zap.ByteString("range-end", r.Perm.RangeEnd),
809+
zap.String("permission-type", authpb.Permission_Type_name[int32(r.Perm.PermType)]),
810+
)
811+
return &pb.AuthRoleGrantPermissionResponse{}, nil
812+
}
784813
// update existing permission
785814
role.KeyPermission[idx].PermType = r.Perm.PermType
786815
} else {
@@ -802,6 +831,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
802831
as.clearCachedPerm()
803832

804833
as.commitRevision(tx)
834+
as.saveConsistentIndex(tx)
805835

806836
as.lg.Info(
807837
"granted/updated a permission to a user",
@@ -1035,6 +1065,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
10351065

10361066
if as.Revision() == 0 {
10371067
as.commitRevision(tx)
1068+
as.saveConsistentIndex(tx)
10381069
}
10391070

10401071
tx.Unlock()
@@ -1279,3 +1310,11 @@ func (as *authStore) HasRole(user, role string) bool {
12791310
func (as *authStore) BcryptCost() int {
12801311
return as.bcryptCost
12811312
}
1313+
1314+
func (as *authStore) saveConsistentIndex(tx backend.BatchTx) {
1315+
if as.syncConsistentIndex != nil {
1316+
as.syncConsistentIndex(tx)
1317+
} else {
1318+
as.lg.Error("failed to save consistentIndex,syncConsistentIndex is nil")
1319+
}
1320+
}

‎auth/store_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,55 @@ func TestListUsers(t *testing.T) {
414414
}
415415
}
416416

417+
func TestRoleGrantPermissionRevision(t *testing.T) {
418+
as, tearDown := setupAuthStore(t)
419+
defer tearDown(t)
420+
421+
_, err := as.RoleAdd(&pb.AuthRoleAddRequest{Name: "role-test-1"})
422+
if err != nil {
423+
t.Fatal(err)
424+
}
425+
426+
perm := &authpb.Permission{
427+
PermType: authpb.WRITE,
428+
Key: []byte("Keys"),
429+
RangeEnd: []byte("RangeEnd"),
430+
}
431+
_, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{
432+
Name: "role-test-1",
433+
Perm: perm,
434+
})
435+
436+
if err != nil {
437+
t.Fatal(err)
438+
}
439+
440+
r, err := as.RoleGet(&pb.AuthRoleGetRequest{Role: "role-test-1"})
441+
if err != nil {
442+
t.Fatal(err)
443+
}
444+
445+
if !reflect.DeepEqual(perm, r.Perm[0]) {
446+
t.Errorf("expected %v, got %v", perm, r.Perm[0])
447+
}
448+
449+
oldRevision := as.Revision()
450+
451+
_, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{
452+
Name: "role-test-1",
453+
Perm: perm,
454+
})
455+
456+
if err != nil {
457+
t.Error(err)
458+
}
459+
newRevision := as.Revision()
460+
461+
if oldRevision != newRevision {
462+
t.Errorf("expected revision diff is 0, got %d", newRevision-oldRevision)
463+
}
464+
}
465+
417466
func TestRoleGrantPermission(t *testing.T) {
418467
as, tearDown := setupAuthStore(t)
419468
defer tearDown(t)

‎etcdserver/backend.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
9595
// case, replace the db with the snapshot db sent by the leader.
9696
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
9797
var cIndex consistentIndex
98-
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
98+
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
9999
defer kv.Close()
100100
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
101101
return oldbe, nil

‎etcdserver/server.go

+14-11
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
514514
CheckpointInterval: cfg.LeaseCheckpointInterval,
515515
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
516516
})
517-
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
517+
518+
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
519+
func(index uint64) <-chan struct{} {
520+
return srv.applyWait.Wait(index)
521+
},
522+
)
523+
if err != nil {
524+
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
525+
return nil, err
526+
}
527+
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))
528+
529+
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
518530
if beExist {
519531
kvindex := srv.kv.ConsistentIndex()
520532
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
@@ -539,16 +551,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
539551
}()
540552

541553
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
542-
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
543-
func(index uint64) <-chan struct{} {
544-
return srv.applyWait.Wait(index)
545-
},
546-
)
547-
if err != nil {
548-
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
549-
return nil, err
550-
}
551-
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))
554+
552555
if num := cfg.AutoCompactionRetention; num != 0 {
553556
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
554557
if err != nil {

‎etcdserver/server_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -983,7 +983,7 @@ func TestSnapshot(t *testing.T) {
983983
r: *r,
984984
v2store: st,
985985
}
986-
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
986+
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
987987
srv.be = be
988988

989989
ch := make(chan struct{}, 2)
@@ -1064,7 +1064,7 @@ func TestSnapshotOrdering(t *testing.T) {
10641064

10651065
be, tmpPath := backend.NewDefaultTmpBackend()
10661066
defer os.RemoveAll(tmpPath)
1067-
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
1067+
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
10681068
s.be = be
10691069

10701070
s.start()
@@ -1125,7 +1125,7 @@ func TestTriggerSnap(t *testing.T) {
11251125
}
11261126
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
11271127

1128-
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
1128+
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
11291129
srv.be = be
11301130

11311131
srv.start()
@@ -1197,7 +1197,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
11971197
defer func() {
11981198
os.RemoveAll(tmpPath)
11991199
}()
1200-
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
1200+
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
12011201
s.be = be
12021202

12031203
s.start()

‎mvcc/kv_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ func TestKVSnapshot(t *testing.T) {
712712

713713
func TestWatchableKVWatch(t *testing.T) {
714714
b, tmpPath := backend.NewDefaultTmpBackend()
715-
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
715+
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
716716
defer cleanup(s, b, tmpPath)
717717

718718
w := s.NewWatchStream()

‎mvcc/watchable_store.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package mvcc
1616

1717
import (
18+
"go.etcd.io/etcd/auth"
1819
"sync"
1920
"time"
2021

@@ -69,11 +70,11 @@ type watchableStore struct {
6970
// cancel operations.
7071
type cancelFunc func()
7172

72-
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
73-
return newWatchableStore(lg, b, le, ig, cfg)
73+
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
74+
return newWatchableStore(lg, b, le, as, ig, cfg)
7475
}
7576

76-
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
77+
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
7778
if lg == nil {
7879
lg = zap.NewNop()
7980
}
@@ -90,6 +91,10 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig Co
9091
// use this store as the deleter so revokes trigger watch events
9192
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
9293
}
94+
if as != nil {
95+
// TODO: encapsulating consistentindex into a separate package
96+
as.SetConsistentIndexSyncer(s.store.saveIndex)
97+
}
9398
s.wg.Add(2)
9499
go s.syncWatchersLoop()
95100
go s.syncVictimsLoop()

‎mvcc/watchable_store_bench_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828

2929
func BenchmarkWatchableStorePut(b *testing.B) {
3030
be, tmpPath := backend.NewDefaultTmpBackend()
31-
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
31+
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
3232
defer cleanup(s, be, tmpPath)
3333

3434
// arbitrary number of bytes
@@ -49,7 +49,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
4949
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
5050
var i fakeConsistentIndex
5151
be, tmpPath := backend.NewDefaultTmpBackend()
52-
s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{})
52+
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &i, StoreConfig{})
5353
defer cleanup(s, be, tmpPath)
5454

5555
// arbitrary number of bytes
@@ -80,7 +80,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {
8080

8181
func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
8282
be, tmpPath := backend.NewDefaultTmpBackend()
83-
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
83+
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
8484
defer cleanup(s, be, tmpPath)
8585

8686
k := []byte("testkey")
@@ -180,7 +180,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
180180

181181
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
182182
be, tmpPath := backend.NewDefaultTmpBackend()
183-
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{})
183+
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
184184

185185
defer func() {
186186
s.store.Close()

0 commit comments

Comments
 (0)
Please sign in to comment.