From 21634a98c68e7041b5cf3d0779ceaa21b304f441 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jasionowski?= Date: Fri, 19 Nov 2021 14:52:30 +0000 Subject: [PATCH 1/3] lease,integration: add checkpoint scheduling after leader change Current checkpointing mechanism is buggy. New checkpoints for any lease are scheduled only until the first leader change. Added fix for that and a test that will check it. --- server/lease/lessor.go | 1 + server/lease/lessor_test.go | 5 +- tests/integration/v3_lease_test.go | 128 +++++++++++++++++++---------- 3 files changed, 87 insertions(+), 47 deletions(-) diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 7236515f2b3..6687529742a 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -446,6 +446,7 @@ func (le *lessor) Promote(extend time.Duration) { l.refresh(extend) item := &LeaseWithTime{id: l.ID, time: l.expiry} le.leaseExpiredNotifier.RegisterOrUpdate(item) + le.scheduleCheckpointIfNeeded(l) } if len(le.leaseMap) < leaseRevokeRate { diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 58a4ad29086..5d6d28782f3 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -531,6 +531,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer be.Close() le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) + defer le.Stop() le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { @@ -543,13 +544,11 @@ func TestLessorCheckpointScheduling(t *testing.T) { t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL) } }) - defer le.Stop() - le.Promote(0) - _, err := le.Grant(1, 2) if err != nil { t.Fatal(err) } + le.Promote(0) // TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds. select { diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 1727da65cce..6adc2f2b3b1 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -229,56 +229,96 @@ func TestV3LeaseKeepAlive(t *testing.T) { // TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted // across leader elections. func TestV3LeaseCheckpoint(t *testing.T) { - BeforeTest(t) - - var ttl int64 = 300 - leaseInterval := 2 * time.Second - clus := NewClusterV3(t, &ClusterConfig{ - Size: 3, - EnableLeaseCheckpoint: true, - LeaseCheckpointInterval: leaseInterval, - UseBridge: true, - }) - defer clus.Terminate(t) - - // create lease - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - c := toGRPC(clus.RandClient()) - lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl}) - if err != nil { - t.Fatal(err) - } + tcs := []struct { + name string + checkpointingEnabled bool + ttl time.Duration + checkpointingInterval time.Duration + leaderChanges int + expectTTLIsGT time.Duration + expectTTLIsLT time.Duration + }{ + { + name: "Checkpointing disabled, lease TTL is reset", + ttl: 300 * time.Second, + leaderChanges: 1, + expectTTLIsGT: 298 * time.Second, + }, + { + name: "Checkpointing enabled 10s, lease TTL is preserved after leader change", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 1, + expectTTLIsLT: 290 * time.Second, + }, + { + // Checking if checkpointing continues after the first leader change. + name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 2, + expectTTLIsLT: 280 * time.Second, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + BeforeTest(t) + config := &ClusterConfig{ + Size: 3, + EnableLeaseCheckpoint: tc.checkpointingEnabled, + LeaseCheckpointInterval: tc.checkpointingInterval, + } + clus := NewClusterV3(t, config) + defer clus.Terminate(t) + + // create lease + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := toGRPC(clus.RandClient()) + lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())}) + if err != nil { + t.Fatal(err) + } - // wait for a checkpoint to occur - time.Sleep(leaseInterval + 1*time.Second) + for i := 0; i < tc.leaderChanges; i++ { + // wait for a checkpoint to occur + time.Sleep(tc.checkpointingInterval + 1*time.Second) - // Force a leader election - leaderId := clus.WaitLeader(t) - leader := clus.Members[leaderId] - leader.Stop(t) - time.Sleep(time.Duration(3*electionTicks) * tickDuration) - leader.Restart(t) - newLeaderId := clus.WaitLeader(t) - c2 := toGRPC(clus.Client(newLeaderId)) + // Force a leader election + leaderId := clus.WaitLeader(t) + leader := clus.Members[leaderId] + leader.Stop(t) + time.Sleep(time.Duration(3*electionTicks) * tickDuration) + leader.Restart(t) + } - time.Sleep(250 * time.Millisecond) + newLeaderId := clus.WaitLeader(t) + c2 := toGRPC(clus.Client(newLeaderId)) + + time.Sleep(250 * time.Millisecond) + + // Check the TTL of the new leader + var ttlresp *pb.LeaseTimeToLiveResponse + for i := 0; i < 10; i++ { + if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil { + if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable { + time.Sleep(time.Millisecond * 250) + } else { + t.Fatal(err) + } + } + } - // Check the TTL of the new leader - var ttlresp *pb.LeaseTimeToLiveResponse - for i := 0; i < 10; i++ { - if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil { - if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable { - time.Sleep(time.Millisecond * 250) - } else { - t.Fatal(err) + if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT { + t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT) } - } - } - expectedTTL := ttl - int64(leaseInterval.Seconds()) - if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL { - t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL) + if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT { + t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT) + } + }) } } From eddfb4232f09cbaeaf1627452611f912eb98e335 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jasionowski?= Date: Fri, 19 Nov 2021 15:02:25 +0000 Subject: [PATCH 2/3] etcdserver,integration: Store remaining TTL on checkpoint To extend lease checkpointing mechanism to cases when the whole etcd cluster is restarted. --- server/lease/lessor.go | 8 +++++--- tests/integration/v3_lease_test.go | 15 ++++++++++++++- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 6687529742a..3249ef5ed2f 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -351,6 +351,7 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { if l, ok := le.leaseMap[id]; ok { // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry l.remainingTTL = remainingTTL + l.persistTo(le.b) if le.isPrimary() { // schedule the next checkpoint as needed le.scheduleCheckpointIfNeeded(l) @@ -790,9 +791,10 @@ func (le *lessor) initAndRecover() { ttl: lpb.TTL, // itemSet will be filled in when recover key-value pairs // set expiry to forever, refresh when promoted - itemSet: make(map[LeaseItem]struct{}), - expiry: forever, - revokec: make(chan struct{}), + itemSet: make(map[LeaseItem]struct{}), + expiry: forever, + revokec: make(chan struct{}), + remainingTTL: lpb.RemainingTTL, } } le.leaseExpiredNotifier.Init() diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 6adc2f2b3b1..9fd0964d532 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -235,6 +235,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { ttl time.Duration checkpointingInterval time.Duration leaderChanges int + clusterSize int expectTTLIsGT time.Duration expectTTLIsLT time.Duration }{ @@ -242,6 +243,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { name: "Checkpointing disabled, lease TTL is reset", ttl: 300 * time.Second, leaderChanges: 1, + clusterSize: 3, expectTTLIsGT: 298 * time.Second, }, { @@ -250,6 +252,16 @@ func TestV3LeaseCheckpoint(t *testing.T) { checkpointingEnabled: true, checkpointingInterval: 10 * time.Second, leaderChanges: 1, + clusterSize: 3, + expectTTLIsLT: 290 * time.Second, + }, + { + name: "Checkpointing enabled 10s, lease TTL is preserved after cluster restart", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 1, + clusterSize: 1, expectTTLIsLT: 290 * time.Second, }, { @@ -259,6 +271,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { checkpointingEnabled: true, checkpointingInterval: 10 * time.Second, leaderChanges: 2, + clusterSize: 3, expectTTLIsLT: 280 * time.Second, }, } @@ -266,7 +279,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { t.Run(tc.name, func(t *testing.T) { BeforeTest(t) config := &ClusterConfig{ - Size: 3, + Size: tc.clusterSize, EnableLeaseCheckpoint: tc.checkpointingEnabled, LeaseCheckpointInterval: tc.checkpointingInterval, } From d00e89db2edec433bff0f6e8bb726058bb66ad2c Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Wed, 1 Dec 2021 15:24:25 +0100 Subject: [PATCH 3/3] server: Require either cluster version v3.6 or --experimental-enable-lease-checkpoint-persist to persist lease remainingTTL To avoid inconsistant behavior during cluster upgrade we are feature gating persistance behind cluster version. This should ensure that all cluster members are upgraded to v3.6 before changing behavior. To allow backporting this fix to v3.5 we are also introducing flag --experimental-enable-lease-checkpoint-persist that will allow for smooth upgrade in v3.5 clusters with this feature enabled. --- server/config/config.go | 4 +- server/embed/config.go | 21 ++++- server/embed/config_test.go | 50 ++++++++++++ server/embed/etcd.go | 1 + server/etcdmain/config.go | 4 +- server/etcdserver/server.go | 3 +- server/lease/leasehttp/http_test.go | 6 +- server/lease/lessor.go | 34 +++++++- server/lease/lessor_bench_test.go | 2 +- server/lease/lessor_test.go | 121 ++++++++++++++++++++++++---- tests/integration/cluster.go | 4 + tests/integration/v3_lease_test.go | 14 +++- 12 files changed, 233 insertions(+), 31 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index b6e2109c228..429bd36e2ae 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -147,10 +147,12 @@ type ServerConfig struct { ForceNewCluster bool - // EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. + // EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. EnableLeaseCheckpoint bool // LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints. LeaseCheckpointInterval time.Duration + // LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. + LeaseCheckpointPersist bool EnableGRPCGateway bool diff --git a/server/embed/config.go b/server/embed/config.go index 8d034125111..dd8b18c5a83 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -314,10 +314,15 @@ type Config struct { // Deprecated in v3.5. // TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913) ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` - // ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases. - ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` - ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` - ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` + // ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change. + ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"` + // ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. + // Requires experimental-enable-lease-checkpoint to be enabled. + // Deprecated in v3.6. + // TODO: Delete in v3.7 + ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"` + ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"` + ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` // ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request // takes more time than this value. ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"` @@ -678,6 +683,14 @@ func (cfg *Config) Validate() error { return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode) } + if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint { + cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist") + } + + if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint { + return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint") + } + return nil } diff --git a/server/embed/config_test.go b/server/embed/config_test.go index d0ff013f8cf..26863eb0bc0 100644 --- a/server/embed/config_test.go +++ b/server/embed/config_test.go @@ -291,6 +291,56 @@ func TestPeerURLsMapAndTokenFromSRV(t *testing.T) { } } +func TestLeaseCheckpointValidate(t *testing.T) { + tcs := []struct { + name string + configFunc func() Config + expectError bool + }{ + { + name: "Default config should pass", + configFunc: func() Config { + return *NewConfig() + }, + }, + { + name: "Enabling checkpoint leases should pass", + configFunc: func() Config { + cfg := *NewConfig() + cfg.ExperimentalEnableLeaseCheckpoint = true + return cfg + }, + }, + { + name: "Enabling checkpoint leases and persist should pass", + configFunc: func() Config { + cfg := *NewConfig() + cfg.ExperimentalEnableLeaseCheckpoint = true + cfg.ExperimentalEnableLeaseCheckpointPersist = true + return cfg + }, + }, + { + name: "Enabling checkpoint leases persist without checkpointing itself should fail", + configFunc: func() Config { + cfg := *NewConfig() + cfg.ExperimentalEnableLeaseCheckpointPersist = true + return cfg + }, + expectError: true, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + cfg := tc.configFunc() + err := cfg.Validate() + if (err != nil) != tc.expectError { + t.Errorf("config.Validate() = %q, expected error: %v", err, tc.expectError) + } + }) + } +} + func TestLogRotation(t *testing.T) { tests := []struct { name string diff --git a/server/embed/etcd.go b/server/embed/etcd.go index aca637c8627..db095edef7f 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -216,6 +216,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing, UnsafeNoFsync: cfg.UnsafeNoFsync, EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint, + LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist, CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit, WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval, DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime, diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index f9c91d9f9c5..550a34bb61b 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -280,7 +280,9 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.") fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") - fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.") + fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.") + // TODO: delete in v3.7 + fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.") fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.") fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.") fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.") diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 56e288cc5f9..a453ec983fe 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -592,9 +592,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. - srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{ + srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{ MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval, + CheckpointPersist: cfg.LeaseCheckpointPersist, ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), }) diff --git a/server/lease/leasehttp/http_test.go b/server/lease/leasehttp/http_test.go index ada3d3a2e2a..375e43c28af 100644 --- a/server/lease/leasehttp/http_test.go +++ b/server/lease/leasehttp/http_test.go @@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) defer betesting.Close(t, be) - le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)}) + le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)}) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 3249ef5ed2f..ce7a7a89a3f 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/coreos/go-semver/semver" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/mvcc/backend" @@ -37,6 +38,8 @@ const NoLease = LeaseID(0) // MaxLeaseTTL is the maximum lease TTL value const MaxLeaseTTL = 9000000000 +var v3_6 = semver.Version{Major: 3, Minor: 6} + var ( forever = time.Time{} @@ -180,19 +183,29 @@ type lessor struct { checkpointInterval time.Duration // the interval to check if the expired lease is revoked expiredLeaseRetryInterval time.Duration + // whether lessor should always persist remaining TTL (always enabled in v3.6). + checkpointPersist bool + // cluster is used to adapt lessor logic based on cluster version + cluster cluster +} + +type cluster interface { + // Version is the cluster-wide minimum major.minor version. + Version() *semver.Version } type LessorConfig struct { MinLeaseTTL int64 CheckpointInterval time.Duration ExpiredLeasesRetryInterval time.Duration + CheckpointPersist bool } -func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { - return newLessor(lg, b, cfg) +func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor { + return newLessor(lg, b, cluster, cfg) } -func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { +func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval if checkpointInterval == 0 { @@ -210,11 +223,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { minLeaseTTL: cfg.MinLeaseTTL, checkpointInterval: checkpointInterval, expiredLeaseRetryInterval: expiredLeaseRetryInterval, + checkpointPersist: cfg.CheckpointPersist, // expiredC is a small buffered chan to avoid unnecessary blocking. expiredC: make(chan []*Lease, 16), stopC: make(chan struct{}), doneC: make(chan struct{}), lg: lg, + cluster: cluster, } l.initAndRecover() @@ -351,7 +366,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { if l, ok := le.leaseMap[id]; ok { // when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry l.remainingTTL = remainingTTL - l.persistTo(le.b) + if le.shouldPersistCheckpoints() { + l.persistTo(le.b) + } if le.isPrimary() { // schedule the next checkpoint as needed le.scheduleCheckpointIfNeeded(l) @@ -360,6 +377,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error { return nil } +func (le *lessor) shouldPersistCheckpoints() bool { + cv := le.cluster.Version() + return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6)) +} + +func greaterOrEqual(first, second semver.Version) bool { + return !first.LessThan(second) +} + // Renew renews an existing lease. If the given lease does not exist or // has expired, an error will be returned. func (le *lessor) Renew(id LeaseID) (int64, error) { diff --git a/server/lease/lessor_bench_test.go b/server/lease/lessor_bench_test.go index 06feec810c5..923f50fba48 100644 --- a/server/lease/lessor_bench_test.go +++ b/server/lease/lessor_bench_test.go @@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) { be, _ := betesting.NewDefaultTmpBackend(t) // MinLeaseTTL is negative, so we can grant expired lease in benchmark. // ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease. - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) + le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond}) le.SetRangeDeleter(func() TxnDelete { ftd := &FakeTxnDelete{be.BatchTx()} ftd.Lock() diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 5d6d28782f3..65118ab3d4d 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -26,7 +26,9 @@ import ( "testing" "time" + "github.com/coreos/go-semver/semver" pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" @@ -46,7 +48,7 @@ func TestLessorGrant(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -108,7 +110,7 @@ func TestLeaseConcurrentKeys(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -157,7 +159,7 @@ func TestLessorRevoke(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() var fd *fakeDeleter le.SetRangeDeleter(func() TxnDelete { @@ -210,7 +212,7 @@ func TestLessorRenew(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.Promote(0) @@ -243,7 +245,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) { for _, cp := range cp.GetCheckpoints() { le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL()) @@ -292,7 +294,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) ttl := int64(10) for i := 1; i <= leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { @@ -311,7 +313,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -341,7 +343,7 @@ func TestLessorDetach(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -382,7 +384,7 @@ func TestLessorRecover(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l1, err1 := le.Grant(1, 10) l2, err2 := le.Grant(2, 20) @@ -391,7 +393,7 @@ func TestLessorRecover(t *testing.T) { } // Create a new lessor with the same backend - nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + nle := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer nle.Stop() nl1 := nle.Lookup(l1.ID) if nl1 == nil || nl1.ttl != l1.ttl { @@ -412,7 +414,7 @@ func TestLessorExpire(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -465,7 +467,7 @@ func TestLessorExpireAndDemote(t *testing.T) { testMinTTL := int64(1) - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL}) defer le.Stop() le.Promote(1 * time.Second) @@ -514,7 +516,7 @@ func TestLessorMaxTTL(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() _, err := le.Grant(1, MaxLeaseTTL+1) @@ -530,7 +532,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) defer le.Stop() le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) @@ -564,7 +566,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL}) defer le.Stop() l, err := le.Grant(1, 10) if err != nil { @@ -578,6 +580,75 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { } } +func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) { + const ttl int64 = 10 + const checkpointTTL int64 = 5 + + tcs := []struct { + name string + cluster cluster + checkpointPersist bool + expectRemainingTTL int64 + }{ + { + name: "Etcd v3.6 and newer persist remainingTTL on checkpoint", + cluster: clusterV3_6(), + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set", + cluster: clusterLatest(), + checkpointPersist: true, + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set", + cluster: clusterNil(), + checkpointPersist: true, + expectRemainingTTL: checkpointTTL, + }, + { + name: "Etcd v3.5 and older reset remainingTTL on checkpoint", + cluster: clusterLatest(), + expectRemainingTTL: ttl, + }, + { + name: "Etcd with version unknown fallbacks to v3.5 behavior", + cluster: clusterNil(), + expectRemainingTTL: ttl, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + lg := zap.NewNop() + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + cfg := LessorConfig{MinLeaseTTL: minLeaseTTL} + cfg.CheckpointPersist = tc.checkpointPersist + le := newLessor(lg, be, tc.cluster, cfg) + l, err := le.Grant(2, ttl) + if err != nil { + t.Fatal(err) + } + if l.RemainingTTL() != ttl { + t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl) + } + le.Checkpoint(2, checkpointTTL) + if l.RemainingTTL() != checkpointTTL { + t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL) + } + le.Stop() + le2 := newLessor(lg, be, clusterV3_6(), cfg) + l = le2.Lookup(2) + if l.RemainingTTL() != tc.expectRemainingTTL { + t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL) + } + }) + } +} + type fakeDeleter struct { deleted []string tx backend.BatchTx @@ -605,3 +676,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) { bcfg.Path = filepath.Join(tmpPath, "be") return tmpPath, backend.New(bcfg) } + +func clusterV3_6() cluster { + return fakeCluster{semver.New("3.6.0")} +} + +func clusterLatest() cluster { + return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")} +} + +func clusterNil() cluster { + return fakeCluster{} +} + +type fakeCluster struct { + version *semver.Version +} + +func (c fakeCluster) Version() *semver.Version { + return c.version +} diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index c8bb969544d..b088df6775b 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -167,6 +167,7 @@ type ClusterConfig struct { EnableLeaseCheckpoint bool LeaseCheckpointInterval time.Duration + LeaseCheckpointPersist bool WatchProgressNotifyInterval time.Duration } @@ -328,6 +329,7 @@ func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member { useBridge: c.cfg.UseBridge, useTCP: c.cfg.UseTCP, enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, + leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist, leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, }) @@ -631,6 +633,7 @@ type memberConfig struct { useTCP bool enableLeaseCheckpoint bool leaseCheckpointInterval time.Duration + leaseCheckpointPersist bool WatchProgressNotifyInterval time.Duration } @@ -729,6 +732,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member { m.useTCP = mcfg.useTCP m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval + m.LeaseCheckpointPersist = mcfg.leaseCheckpointPersist m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 9fd0964d532..04ecc097948 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -234,6 +234,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { checkpointingEnabled bool ttl time.Duration checkpointingInterval time.Duration + checkpointingPersist bool leaderChanges int clusterSize int expectTTLIsGT time.Duration @@ -256,14 +257,24 @@ func TestV3LeaseCheckpoint(t *testing.T) { expectTTLIsLT: 290 * time.Second, }, { - name: "Checkpointing enabled 10s, lease TTL is preserved after cluster restart", + name: "Checkpointing enabled 10s with persist, lease TTL is preserved after cluster restart", ttl: 300 * time.Second, checkpointingEnabled: true, checkpointingInterval: 10 * time.Second, + checkpointingPersist: true, leaderChanges: 1, clusterSize: 1, expectTTLIsLT: 290 * time.Second, }, + { + name: "Checkpointing enabled 10s, lease TTL is reset after restart", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 1, + clusterSize: 1, + expectTTLIsGT: 298 * time.Second, + }, { // Checking if checkpointing continues after the first leader change. name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes", @@ -282,6 +293,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { Size: tc.clusterSize, EnableLeaseCheckpoint: tc.checkpointingEnabled, LeaseCheckpointInterval: tc.checkpointingInterval, + LeaseCheckpointPersist: tc.checkpointingPersist, } clus := NewClusterV3(t, config) defer clus.Terminate(t)