Skip to content

Commit

Permalink
lease: Add LessorConfig and wire zap logger into Lessor
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbetz committed Jul 17, 2018
1 parent 75ac18c commit bbe2d77
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 25 deletions.
2 changes: 1 addition & 1 deletion clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (s *v3Manager) saveDB() error {
be := backend.NewDefaultBackend(dbpath)

// a lessor never timeouts leases
lessor := lease.NewLessor(be, math.MaxInt64)
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})

mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit))
txn := mvs.Write()
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
srv.lessor = lease.NewLessor(srv.getLogger(), srv.be, lease.LessorConfig{MinLeaseTTL: int64(math.Ceil(minTTL.Seconds()))})
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()
Expand Down
10 changes: 7 additions & 3 deletions lease/leasehttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ import (

"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"go.uber.org/zap"
)

func TestRenewHTTP(t *testing.T) {
lg := zap.NewNop()
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand All @@ -51,11 +53,12 @@ func TestRenewHTTP(t *testing.T) {
}

func TestTimeToLiveHTTP(t *testing.T) {
lg := zap.NewNop()
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down Expand Up @@ -92,11 +95,12 @@ func TestTimeToLiveHTTPTimeout(t *testing.T) {
}

func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
lg := zap.NewNop()
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
Expand Down
16 changes: 12 additions & 4 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/mvcc/backend"
"go.uber.org/zap"
)

// NoLease is a special LeaseID representing the absence of a lease.
Expand Down Expand Up @@ -144,23 +145,30 @@ type lessor struct {
stopC chan struct{}
// doneC is a channel whose closure indicates that the lessor is stopped.
doneC chan struct{}

lg *zap.Logger
}

type LessorConfig struct {
MinLeaseTTL int64
}

func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor {
return newLessor(b, minLeaseTTL)
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg)
}

func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: minLeaseTTL,
minLeaseTTL: cfg.MinLeaseTTL,
// expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
lg: lg,
}
l.initAndRecover()

Expand Down
13 changes: 9 additions & 4 deletions lease/lessor_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/coreos/etcd/mvcc/backend"
"go.uber.org/zap"
)

func BenchmarkLessorFindExpired1(b *testing.B) { benchmarkLessorFindExpired(1, b) }
Expand Down Expand Up @@ -54,8 +55,9 @@ func BenchmarkLessorRevoke100000(b *testing.B) { benchmarkLessorRevoke(100000,
func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000, b) }

func benchmarkLessorFindExpired(size int, b *testing.B) {
lg := zap.NewNop()
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
defer cleanup(be, tmpPath)
le.Promote(0)
Expand All @@ -71,8 +73,9 @@ func benchmarkLessorFindExpired(size int, b *testing.B) {
}

func benchmarkLessorGrant(size int, b *testing.B) {
lg := zap.NewNop()
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
Expand All @@ -85,8 +88,9 @@ func benchmarkLessorGrant(size int, b *testing.B) {
}

func benchmarkLessorRevoke(size int, b *testing.B) {
lg := zap.NewNop()
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
Expand All @@ -102,8 +106,9 @@ func benchmarkLessorRevoke(size int, b *testing.B) {
}

func benchmarkLessorRenew(size int, b *testing.B) {
lg := zap.NewNop()
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
Expand Down
35 changes: 23 additions & 12 deletions lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/coreos/etcd/mvcc/backend"
"go.uber.org/zap"
)

const (
Expand All @@ -37,11 +38,12 @@ const (
// The granted lease should have a unique ID with a term
// that is greater than minLeaseTTL.
func TestLessorGrant(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)

Expand Down Expand Up @@ -98,11 +100,12 @@ func TestLessorGrant(t *testing.T) {
// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
// from concurrent map writes on 'itemSet'.
func TestLeaseConcurrentKeys(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })

Expand Down Expand Up @@ -146,11 +149,12 @@ func TestLeaseConcurrentKeys(t *testing.T) {
// the backend.
// The revoked lease cannot be got from Lessor again.
func TestLessorRevoke(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
var fd *fakeDeleter
le.SetRangeDeleter(func() TxnDelete {
Expand Down Expand Up @@ -198,11 +202,12 @@ func TestLessorRevoke(t *testing.T) {

// TestLessorRenew ensures Lessor can renew an existing lease.
func TestLessorRenew(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer be.Close()
defer os.RemoveAll(dir)

le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)

Expand Down Expand Up @@ -234,12 +239,13 @@ func TestLessorRenew(t *testing.T) {
func TestLessorRenewExtendPileup(t *testing.T) {
oldRevokeRate := leaseRevokeRate
defer func() { leaseRevokeRate = oldRevokeRate }()
lg := zap.NewNop()
leaseRevokeRate = 10

dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)

le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
Expand All @@ -258,7 +264,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
le = newLessor(be, minLeaseTTL)
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()

// extend after recovery should extend expiration on lease pile-up
Expand All @@ -283,11 +289,12 @@ func TestLessorRenewExtendPileup(t *testing.T) {
}

func TestLessorDetach(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })

Expand Down Expand Up @@ -323,11 +330,12 @@ func TestLessorDetach(t *testing.T) {
// TestLessorRecover ensures Lessor recovers leases from
// persist backend.
func TestLessorRecover(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20)
Expand All @@ -336,7 +344,7 @@ func TestLessorRecover(t *testing.T) {
}

// Create a new lessor with the same backend
nle := newLessor(be, minLeaseTTL)
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer nle.Stop()
nl1 := nle.Lookup(l1.ID)
if nl1 == nil || nl1.ttl != l1.ttl {
Expand All @@ -350,13 +358,14 @@ func TestLessorRecover(t *testing.T) {
}

func TestLessorExpire(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

testMinTTL := int64(1)

le := newLessor(be, testMinTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()

le.Promote(1 * time.Second)
Expand Down Expand Up @@ -402,13 +411,14 @@ func TestLessorExpire(t *testing.T) {
}

func TestLessorExpireAndDemote(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

testMinTTL := int64(1)

le := newLessor(be, testMinTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()

le.Promote(1 * time.Second)
Expand Down Expand Up @@ -452,11 +462,12 @@ func TestLessorExpireAndDemote(t *testing.T) {
}

func TestLessorMaxTTL(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()

le := newLessor(be, minLeaseTTL)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()

_, err := le.Grant(1, MaxLeaseTTL+1)
Expand Down

0 comments on commit bbe2d77

Please sign in to comment.