diff --git a/example/00-backup-restore-server-config.yaml b/example/00-backup-restore-server-config.yaml index 65222b582..e810661e1 100644 --- a/example/00-backup-restore-server-config.yaml +++ b/example/00-backup-restore-server-config.yaml @@ -49,3 +49,6 @@ defragmentationSchedule: "0 0 */3 * *" compressionConfig: enabled: true policy: "gzip" + +electionConfig: + reElectionPeriod: "5s" diff --git a/pkg/leaderelection/init.go b/pkg/leaderelection/init.go new file mode 100644 index 000000000..f6969c7fe --- /dev/null +++ b/pkg/leaderelection/init.go @@ -0,0 +1,37 @@ +// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leaderelection + +import ( + "github.com/gardener/etcd-backup-restore/pkg/wrappers" + flag "github.com/spf13/pflag" +) + +// NewElectionConfig returns the ElectionConfig. +func NewElectionConfig() *ElectionConfig { + return &ElectionConfig{ + ReElectionPeriod: wrappers.Duration{Duration: DefaultReElectionPeriod}, + } +} + +// AddFlags adds the flags to flagset. +func (e *ElectionConfig) AddFlags(fs *flag.FlagSet) { + fs.DurationVar(&e.ReElectionPeriod.Duration, "election-period", e.ReElectionPeriod.Duration, "Period after election will be re-triggered to check the leadership status.") +} + +// Validate validates the ElectionConfig. +func (e *ElectionConfig) Validate() error { + return nil +} diff --git a/pkg/leaderelection/leaderelection.go b/pkg/leaderelection/leaderelection.go new file mode 100644 index 000000000..912dfb6a7 --- /dev/null +++ b/pkg/leaderelection/leaderelection.go @@ -0,0 +1,138 @@ +// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leaderelection + +import ( + "context" + "fmt" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/errors" + "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + "github.com/sirupsen/logrus" +) + +var emptyStruct struct{} + +// NewLeaderElector returns LeaderElector configurations. +func NewLeaderElector(logger *logrus.Entry, etcdConnectionConfig *etcdutil.EtcdConnectionConfig, leaderElectionConfig *ElectionConfig, callbacks *LeaderCallbacks) (*LeaderElector, error) { + return &LeaderElector{ + logger: logger.WithField("actor", "leader-elector"), + EtcdConnectionConfig: etcdConnectionConfig, + CurrentState: DefaultCurrentState, + StopCh: make(chan struct{}), + ProbeEtcdch: make(chan struct{}), + PrevElectionResult: ElectionLost, + Config: leaderElectionConfig, + Callbacks: callbacks, + }, nil +} + +// StartElection starts the LeaderElection to Elect the backup-restore's Leader. +func (le *LeaderElector) StartElection(ctx context.Context) { + le.logger.Infof("Starting LeaderElection...") + + for { + select { + case <-ctx.Done(): + le.logger.Info("Shutting down LeaderElection...") + return + case <-le.StopCh: + le.logger.Info("Shutting down LeaderElection...") + return + case <-time.After(le.Config.ReElectionPeriod.Duration): + err := le.isLeader(ctx) + if err != nil { + le.CurrentState = StateUnknown + le.logger.Errorf("Failed to elect the backup-restore leader: %v", err) + } + + if le.CurrentState == StateUnknown { + le.PrevElectionResult = ElectionLost + le.logger.Info("backup-restore is in: ", StateUnknown) + // stops the Running Snapshotter. + if le.Callbacks.OnStoppedLeading != nil { + le.Callbacks.OnStoppedLeading() + le.logger.Info("backup-restore lost the election") + le.logger.Info("waiting for Re-election...") + } + } else if le.PrevElectionResult == ElectionLost && le.CurrentState == StateLeader { + // It becomes the Leader backup-restore sidecar. + // Get the latest snapshotter object and start the snapshotter. + le.PrevElectionResult = ElectionWon + le.logger.Info("backup-restore become: ", StateLeader) + if le.Callbacks.OnStartedLeading != nil { + le.Callbacks.OnStartedLeading(ctx) + } + le.logger.Info("sending signal to ProbeEtcd to start the Snapshotter") + le.ProbeEtcdch <- emptyStruct + } else if le.PrevElectionResult == ElectionWon && le.CurrentState == StateLeader { + le.logger.Info("no change in leadershipStatus...") + } else if le.CurrentState == StateFollower { + le.logger.Info("************ I'm not the Leader *************** ") + + le.PrevElectionResult = ElectionLost + // stop the Running Snapshotter + if le.Callbacks.OnStoppedLeading != nil { + le.Callbacks.OnStoppedLeading() + le.logger.Info("backup-restore lost the election") + le.logger.Info("backup-restore become: ", StateFollower) + } + } + } + } +} + +// isLeader checks whether the current backup-restore is leader or not and sets the currentState of backup-restore accordingly. +func (le *LeaderElector) isLeader(ctx context.Context) error { + le.logger.Info("checking for leadershipStatus...") + var endPoint string + client, err := etcdutil.GetTLSClientForEtcd(le.EtcdConnectionConfig) + if err != nil { + return &errors.EtcdError{ + Message: fmt.Sprintf("Failed to create etcd client: %v", err), + } + } + defer client.Close() + + le.logger.Debugf("Etcd Endpoint: %v", le.EtcdConnectionConfig.Endpoints) + + le.CurrentState = StateFollower + + if len(le.EtcdConnectionConfig.Endpoints) > 0 { + endPoint = le.EtcdConnectionConfig.Endpoints[0] + } else { + return fmt.Errorf("Etcd endpoints are not passed correctly") + } + ctx, cancel := context.WithTimeout(ctx, le.EtcdConnectionConfig.ConnectionTimeout.Duration) + response, err := client.Status(ctx, endPoint) + cancel() + if err != nil { + le.logger.Errorf("Failed to get response header from etcd client: %v", err) + return err + } + + le.logger.Infof("MemberID: %v", response.Header.MemberId) + le.logger.Infof("LeaderID: %v", response.Leader) + + if response.Header.MemberId == response.Leader { + le.CurrentState = StateLeader + } else if response.Leader == QuorumLostID { + return &errors.EtcdError{ + Message: fmt.Sprintf("Currently there is no Etcd-Leader present may be due to etcd quorum loss."), + } + } + return nil +} diff --git a/pkg/leaderelection/types.go b/pkg/leaderelection/types.go new file mode 100644 index 000000000..9508dc798 --- /dev/null +++ b/pkg/leaderelection/types.go @@ -0,0 +1,77 @@ +// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leaderelection + +import ( + "context" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + "github.com/gardener/etcd-backup-restore/pkg/wrappers" + "github.com/sirupsen/logrus" +) + +const ( + // StateFollower defines currentState of backup-restore as "Follower". + StateFollower = "Follower" + // StateCandidate defines currentState of backup-restore as "Candidate". + StateCandidate = "Candidate" + // StateLeader defines currentState of backup-restore as "Leader". + StateLeader = "Leader" + // StateUnknown defines currentState of backup-restore as "UnknownState". + StateUnknown = "UnknownState" + + // DefaultCurrentState defines default currentState of backup-restore as "Follower". + DefaultCurrentState = StateFollower + + // ElectionLost defines the state of LeaderElection as ElectionLost. + ElectionLost = "Lost" + // ElectionWon defines the state of LeaderElection as ElectionWon. + ElectionWon = "Won" + + // QuorumLostID defines the state when etcd returns LeaderID as 0. + QuorumLostID = 0 + + // DefaultReElectionPeriod defines default ReElectionPEriod. + DefaultReElectionPeriod = 5 * time.Second +) + +// LeaderCallbacks are callbacks that are triggered when leader currentState changes to start/stop the snapshottter. +type LeaderCallbacks struct { + // OnStartedLeading is called when a LeaderElector client starts leading. + OnStartedLeading func(context.Context) + // OnStoppedLeading is called when a LeaderElector client stops leading. + OnStoppedLeading func() +} + +// ElectionConfig holds the LeaderElection configurations. +type ElectionConfig struct { + // ReElectionPeriod defines the Period after which leadership status is checked. + ReElectionPeriod wrappers.Duration `json:"reElectionPeriod,omitempty"` +} + +// LeaderElector holds the all configuration necessary to elect backup-restore Leader. +type LeaderElector struct { + // CurrentState defines currentState of backup-restore for LeaderElection. + CurrentState string + Config *ElectionConfig + EtcdConnectionConfig *etcdutil.EtcdConnectionConfig + logger *logrus.Entry + StopCh chan struct{} + ProbeEtcdch chan struct{} + PrevElectionResult string + Callbacks *LeaderCallbacks + //leaderElectionCancel context.CancelFunc +} diff --git a/pkg/server/backuprestoreserver.go b/pkg/server/backuprestoreserver.go index ab20a8c62..06f5f54ef 100644 --- a/pkg/server/backuprestoreserver.go +++ b/pkg/server/backuprestoreserver.go @@ -22,6 +22,7 @@ import ( "time" "github.com/gardener/etcd-backup-restore/pkg/errors" + "github.com/gardener/etcd-backup-restore/pkg/leaderelection" "github.com/gardener/etcd-backup-restore/pkg/metrics" brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/prometheus/client_golang/prometheus" @@ -61,13 +62,13 @@ func NewBackupRestoreServer(logger *logrus.Logger, config *BackupRestoreComponen func (b *BackupRestoreServer) Run(ctx context.Context) error { clusterURLsMap, err := types.NewURLsMap(b.config.RestorationConfig.InitialCluster) if err != nil { - // Ideally this case should not occur, since this check is done at the config validaitions. + // Ideally this case should not occur, since this check is done at the config validations. b.logger.Fatalf("failed creating url map for restore cluster: %v", err) } peerURLs, err := types.NewURLs(b.config.RestorationConfig.InitialAdvertisePeerURLs) if err != nil { - // Ideally this case should not occur, since this check is done at the config validaitions. + // Ideally this case should not occur, since this check is done at the config validations. b.logger.Fatalf("failed creating url map for restore cluster: %v", err) } @@ -133,6 +134,7 @@ func (b *BackupRestoreServer) runServerWithoutSnapshotter(ctx context.Context, r // for the case where snapshotter is configured correctly func (b *BackupRestoreServer) runServerWithSnapshotter(ctx context.Context, restoreOpts *brtypes.RestoreOptions) error { ackCh := make(chan struct{}) + ssrStopCh := make(chan struct{}) etcdInitializer := initializer.NewInitializer(restoreOpts, b.config.SnapstoreConfig, b.logger.Logger) @@ -151,19 +153,47 @@ func (b *BackupRestoreServer) runServerWithSnapshotter(ctx context.Context, rest handler := b.startHTTPServer(etcdInitializer, ssr) defer handler.Stop() - ssrStopCh := make(chan struct{}) + leaderCallbacks := &leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + // Get the updated snapshotter state + err := ssr.GetLatestSnapshotterState() + if err != nil { + fmt.Printf("Unable to get Latest SnapshotterState: %v", err) + } + handler.Snapshotter = ssr + }, + OnStoppedLeading: func() { + // stops the running snapshotter + ssr.SsrStateMutex.Lock() + if ssr.SsrState == brtypes.SnapshotterActive { + ssr.SsrStateMutex.Unlock() + ssrStopCh <- emptyStruct + } else { + ssr.SsrStateMutex.Unlock() + } + }, + } + + b.logger.Infof("Creating leaderElector...") + le, err := leaderelection.NewLeaderElector(b.logger, b.config.EtcdConnectionConfig, b.config.LeaderElectionConfig, leaderCallbacks) + if err != nil { + return err + } + + go le.StartElection(ctx) + go handleSsrStopRequest(ctx, handler, ssr, ackCh, ssrStopCh) go handleAckState(handler, ackCh) go defragmentor.DefragDataPeriodically(ctx, b.config.EtcdConnectionConfig, b.defragmentationSchedule, ssr.TriggerFullSnapshot, b.logger) - b.runEtcdProbeLoopWithSnapshotter(ctx, handler, ssr, ssrStopCh, ackCh) + b.runEtcdProbeLoopWithSnapshotter(ctx, handler, ssr, ssrStopCh, ackCh, le) return nil } // runEtcdProbeLoopWithSnapshotter runs the etcd probe loop // for the case where snapshotter is configured correctly -func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Context, handler *HTTPHandler, ssr *snapshotter.Snapshotter, ssrStopCh chan struct{}, ackCh chan struct{}) { +func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Context, handler *HTTPHandler, ssr *snapshotter.Snapshotter, ssrStopCh chan struct{}, ackCh chan struct{}, le *leaderelection.LeaderElector) { var ( err error initialDeltaSnapshotTaken bool @@ -177,6 +207,9 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex return default: err = b.probeEtcd(ctx) + if err == nil { + <-le.ProbeEtcdch + } } if err != nil { b.logger.Errorf("Failed to probe etcd: %v", err) diff --git a/pkg/server/init.go b/pkg/server/init.go index 75e26d843..c0bab531a 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -20,6 +20,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/compressor" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + "github.com/gardener/etcd-backup-restore/pkg/leaderelection" "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" brtypes "github.com/gardener/etcd-backup-restore/pkg/types" @@ -38,6 +39,7 @@ func NewBackupRestoreComponentConfig() *BackupRestoreComponentConfig { CompressionConfig: compressor.NewCompressorConfig(), RestorationConfig: brtypes.NewRestorationConfig(), DefragmentationSchedule: defaultDefragmentationSchedule, + LeaderElectionConfig: leaderelection.NewElectionConfig(), } } @@ -49,6 +51,7 @@ func (c *BackupRestoreComponentConfig) AddFlags(fs *flag.FlagSet) { c.SnapstoreConfig.AddFlags(fs) c.RestorationConfig.AddFlags(fs) c.CompressionConfig.AddFlags(fs) + c.LeaderElectionConfig.AddFlags(fs) // Miscellaneous fs.StringVar(&c.DefragmentationSchedule, "defragmentation-schedule", c.DefragmentationSchedule, "schedule to defragment etcd data directory") @@ -77,6 +80,9 @@ func (c *BackupRestoreComponentConfig) Validate() error { if _, err := cron.ParseStandard(c.DefragmentationSchedule); err != nil { return err } + if err := c.LeaderElectionConfig.Validate(); err != nil { + return err + } return nil } diff --git a/pkg/server/types.go b/pkg/server/types.go index 3d29b9c61..b08c91282 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -17,6 +17,7 @@ package server import ( "github.com/gardener/etcd-backup-restore/pkg/compressor" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + "github.com/gardener/etcd-backup-restore/pkg/leaderelection" brtypes "github.com/gardener/etcd-backup-restore/pkg/types" ) @@ -34,6 +35,7 @@ type BackupRestoreComponentConfig struct { CompressionConfig *compressor.CompressionConfig `json:"compressionConfig,omitempty"` RestorationConfig *brtypes.RestorationConfig `json:"restorationConfig,omitempty"` DefragmentationSchedule string `json:"defragmentationSchedule"` + LeaderElectionConfig *leaderelection.ElectionConfig `json:"electionConfig,omitempty"` } // latestSnapshotMetadata holds snapshot details of latest full and delta snapshots diff --git a/pkg/snapshot/snapshotter/snapshotter.go b/pkg/snapshot/snapshotter/snapshotter.go index 1cf8c5be3..46cf9375a 100644 --- a/pkg/snapshot/snapshotter/snapshotter.go +++ b/pkg/snapshot/snapshotter/snapshotter.go @@ -99,10 +99,35 @@ func NewSnapshotter(logger *logrus.Entry, config *brtypes.SnapshotterConfig, sto return nil, fmt.Errorf("invalid schedule provied %s : %v", config.FullSnapshotSchedule, err) } - var prevSnapshot *brtypes.Snapshot - fullSnap, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store) + ssr := &Snapshotter{ + logger: logger.WithField("actor", "snapshotter"), + store: store, + config: config, + etcdConnectionConfig: etcdConnectionConfig, + compressionConfig: compressionConfig, + schedule: sdl, + SsrState: brtypes.SnapshotterInactive, + SsrStateMutex: &sync.Mutex{}, + fullSnapshotReqCh: make(chan struct{}), + deltaSnapshotReqCh: make(chan struct{}), + fullSnapshotAckCh: make(chan result), + deltaSnapshotAckCh: make(chan result), + cancelWatch: func() {}, + } + + err = ssr.GetLatestSnapshotterState() if err != nil { return nil, err + } + return ssr, nil +} + +// GetLatestSnapshotterState load the previous snapshot state for ssr. +func (ssr *Snapshotter) GetLatestSnapshotterState() error { + var prevSnapshot *brtypes.Snapshot + fullSnap, deltaSnapList, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(ssr.store) + if err != nil { + return err } else if fullSnap != nil && len(deltaSnapList) == 0 { prevSnapshot = fullSnap // setting timestamps of both full and delta to prev full snapshot's timestamp @@ -118,26 +143,10 @@ func NewSnapshotter(logger *logrus.Entry, config *brtypes.SnapshotterConfig, sto } metrics.LatestSnapshotRevision.With(prometheus.Labels{metrics.LabelKind: prevSnapshot.Kind}).Set(float64(prevSnapshot.LastRevision)) - - return &Snapshotter{ - logger: logger.WithField("actor", "snapshotter"), - store: store, - config: config, - etcdConnectionConfig: etcdConnectionConfig, - compressionConfig: compressionConfig, - - schedule: sdl, - prevSnapshot: prevSnapshot, - PrevFullSnapshot: fullSnap, - PrevDeltaSnapshots: deltaSnapList, - SsrState: brtypes.SnapshotterInactive, - SsrStateMutex: &sync.Mutex{}, - fullSnapshotReqCh: make(chan struct{}), - deltaSnapshotReqCh: make(chan struct{}), - fullSnapshotAckCh: make(chan result), - deltaSnapshotAckCh: make(chan result), - cancelWatch: func() {}, - }, nil + ssr.prevSnapshot = prevSnapshot + ssr.PrevFullSnapshot = fullSnap + ssr.PrevDeltaSnapshots = deltaSnapList + return nil } // Run process loop for scheduled backup