diff --git a/example/00-backup-restore-server-config.yaml b/example/00-backup-restore-server-config.yaml index 65222b582..6d56b09e9 100644 --- a/example/00-backup-restore-server-config.yaml +++ b/example/00-backup-restore-server-config.yaml @@ -49,3 +49,7 @@ defragmentationSchedule: "0 0 */3 * *" compressionConfig: enabled: true policy: "gzip" + +leaderElectionConfig: + reelectionPeriod: "5s" + etcdConnectionTimeout: "5s" diff --git a/pkg/defragmentor/defrag.go b/pkg/defragmentor/defrag.go index d0491bd4a..a234cd403 100644 --- a/pkg/defragmentor/defrag.go +++ b/pkg/defragmentor/defrag.go @@ -23,7 +23,7 @@ import ( "github.com/sirupsen/logrus" ) -// CallbackFunc is type decalration for callback function for defragmentor +// CallbackFunc is type declaration for callback function for defragmentor type CallbackFunc func(ctx context.Context) (*brtypes.Snapshot, error) // defragmentorJob implement the cron.Job for etcd defragmentation. diff --git a/pkg/leaderelection/init.go b/pkg/leaderelection/init.go new file mode 100644 index 000000000..a341a01e6 --- /dev/null +++ b/pkg/leaderelection/init.go @@ -0,0 +1,50 @@ +// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leaderelection + +import ( + "fmt" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/wrappers" + flag "github.com/spf13/pflag" +) + +// NewLeaderElectionConfig returns the Config. +func NewLeaderElectionConfig() *Config { + return &Config{ + ReelectionPeriod: wrappers.Duration{Duration: DefaultReelectionPeriod}, + EtcdConnectionTimeout: wrappers.Duration{Duration: DefaultEtcdConnectionTimeout}, + } +} + +// AddFlags adds the flags to flagset. +func (c *Config) AddFlags(fs *flag.FlagSet) { + fs.DurationVar(&c.EtcdConnectionTimeout.Duration, "etcd-connection-timeout-leader-election", c.EtcdConnectionTimeout.Duration, "etcd client connection timeout during leader election.") + fs.DurationVar(&c.ReelectionPeriod.Duration, "reelection-period", c.ReelectionPeriod.Duration, "Period after which election will be re-triggered to check the leadership status.") +} + +// Validate validates the Config. +func (c *Config) Validate() error { + if c.ReelectionPeriod.Duration <= time.Duration(1*time.Second) { + return fmt.Errorf("ReelectionPeriod should be greater than 1 second") + } + + if c.EtcdConnectionTimeout.Duration <= time.Duration(1*time.Second) { + return fmt.Errorf("etcd connection timeout in leaderElection should be greater than 1 second") + } + + return nil +} diff --git a/pkg/leaderelection/leaderelection.go b/pkg/leaderelection/leaderelection.go new file mode 100644 index 000000000..4470c4cd7 --- /dev/null +++ b/pkg/leaderelection/leaderelection.go @@ -0,0 +1,191 @@ +// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leaderelection + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/errors" + "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + "github.com/sirupsen/logrus" +) + +// NewLeaderElector returns LeaderElector configurations. +func NewLeaderElector(logger *logrus.Entry, etcdConnectionConfig *etcdutil.EtcdConnectionConfig, leaderElectionConfig *Config, callbacks *LeaderCallbacks) (*LeaderElector, error) { + return &LeaderElector{ + logger: logger.WithField("actor", "leader-elector"), + EtcdConnectionConfig: etcdConnectionConfig, + CurrentState: DefaultCurrentState, + Config: leaderElectionConfig, + Callbacks: callbacks, + ElectionLock: &sync.Mutex{}, + }, nil +} + +// Run starts the LeaderElection loop to elect the backup-restore's Leader +// and keep checking the leadership status of backup-restore. +func (le *LeaderElector) Run(ctx context.Context) error { + le.logger.Infof("Starting leaderElection...") + var leCtx context.Context + var leCancel context.CancelFunc + for { + select { + case <-ctx.Done(): + le.logger.Info("Shutting down LeaderElection...") + leCancel() + return nil + case <-time.After(le.Config.ReelectionPeriod.Duration): + isLeader, err := le.IsLeader(ctx) + if err != nil { + le.logger.Errorf("Failed to elect the backup-restore leader: %v", err) + + // set the CurrentState of backup-restore. + // stops the Running Snapshotter. + // wait for Reelection to happen. + le.CurrentState = StateUnknown + le.logger.Infof("backup-restore is in: %v", le.CurrentState) + if le.Callbacks.OnStoppedLeading != nil && leCtx != nil { + leCancel() + le.Callbacks.OnStoppedLeading() + } + le.logger.Info("waiting for Re-election...") + continue + } + + if isLeader && (le.CurrentState == StateFollower || le.CurrentState == StateUnknown || le.CurrentState == StateCandidate) { + // backup-restore becomes the Leader backup-restore. + // set the CurrentState of backup-restore. + // update the snapshotter object with latest snapshotter object. + // start the snapshotter. + le.CurrentState = StateLeader + le.logger.Infof("backup-restore became: %v", le.CurrentState) + + if le.Callbacks.OnStartedLeading != nil { + leCtx, leCancel = context.WithCancel(ctx) + le.logger.Info("backup-restore started leading...") + le.Callbacks.OnStartedLeading(leCtx) + } + } else if isLeader && le.CurrentState == StateLeader { + le.logger.Debug("no change in leadershipStatus...") + } else if !isLeader && le.CurrentState == StateLeader { + // backup-restore lost the election and becomes Follower. + // set the CurrentState of backup-restore. + // stop the Running snapshotter. + le.CurrentState = StateFollower + le.logger.Info("backup-restore lost the election") + le.logger.Infof("backup-restore became: %v", le.CurrentState) + + if le.Callbacks.OnStoppedLeading != nil && leCtx != nil { + leCancel() + le.Callbacks.OnStoppedLeading() + } + } else if !isLeader && le.CurrentState == StateUnknown { + le.CurrentState = StateFollower + le.logger.Infof("backup-restore changed the state from %v to %v", StateUnknown, le.CurrentState) + } else if !isLeader && le.CurrentState == StateFollower { + le.logger.Infof("backup-restore currentState: %v", le.CurrentState) + } + } + } +} + +// IsLeader checks whether the current backup-restore is leader or not. +func (le *LeaderElector) IsLeader(ctx context.Context) (bool, error) { + le.logger.Info("checking the leadershipStatus...") + var endPoint string + client, err := etcdutil.GetTLSClientForEtcd(le.EtcdConnectionConfig) + if err != nil { + return false, &errors.EtcdError{ + Message: fmt.Sprintf("Failed to create etcd client: %v", err), + } + } + defer client.Close() + + if len(le.EtcdConnectionConfig.Endpoints) > 0 { + endPoint = le.EtcdConnectionConfig.Endpoints[0] + } else { + return false, fmt.Errorf("Etcd endpoints are not passed correctly") + } + + ctx, cancel := context.WithTimeout(ctx, le.Config.EtcdConnectionTimeout.Duration) + defer cancel() + + response, err := client.Status(ctx, endPoint) + if err != nil { + le.logger.Errorf("Failed to get status of etcd endPoint: %v with error: %v", le.EtcdConnectionConfig.Endpoints[0], err) + return false, err + } + + if response.Header.MemberId == response.Leader { + return true, nil + } else if response.Leader == NoLeaderID { + return false, &errors.EtcdError{ + Message: fmt.Sprintf("Currently there is no Etcd Leader present may be due to etcd quorum loss or election is being held."), + } + } + return false, nil +} + +// GetLeader will return the LeaderID as well as PeerURLs of etcd leader. +func (le *LeaderElector) GetLeader(ctx context.Context) (uint64, []string, error) { + le.logger.Info("getting the etcd leaderID...") + var endPoint string + client, err := etcdutil.GetTLSClientForEtcd(le.EtcdConnectionConfig) + if err != nil { + return NoLeaderID, nil, &errors.EtcdError{ + Message: fmt.Sprintf("Failed to create etcd client: %v", err), + } + } + defer client.Close() + + if len(le.EtcdConnectionConfig.Endpoints) > 0 { + endPoint = le.EtcdConnectionConfig.Endpoints[0] + } else { + return NoLeaderID, nil, &errors.EtcdError{ + Message: fmt.Sprintf("Etcd endpoints are not passed correctly"), + } + } + + ctx, cancel := context.WithTimeout(ctx, le.Config.EtcdConnectionTimeout.Duration) + defer cancel() + + response, err := client.Status(ctx, endPoint) + if err != nil { + le.logger.Errorf("Failed to get status of etcd endPoint: %v with error: %v", le.EtcdConnectionConfig.Endpoints[0], err) + return NoLeaderID, nil, err + } + + if response.Leader == NoLeaderID { + return NoLeaderID, nil, &errors.EtcdError{ + Message: fmt.Sprintf("Currently there is no Etcd Leader present may be due to etcd quorum loss."), + } + } + + membersInfo, err := client.MemberList(ctx) + if err != nil { + le.logger.Errorf("Failed to get memberList of etcd with error: %v", err) + return response.Leader, nil, err + } + + for _, member := range membersInfo.Members { + if response.Leader == member.GetID() { + return response.Leader, member.GetPeerURLs(), nil + } + } + return response.Leader, nil, nil +} diff --git a/pkg/leaderelection/types.go b/pkg/leaderelection/types.go new file mode 100644 index 000000000..f8be4c943 --- /dev/null +++ b/pkg/leaderelection/types.go @@ -0,0 +1,74 @@ +// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leaderelection + +import ( + "context" + "sync" + "time" + + "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + "github.com/gardener/etcd-backup-restore/pkg/wrappers" + "github.com/sirupsen/logrus" +) + +const ( + // StateFollower defines currentState of backup-restore as "Follower". + StateFollower = "Follower" + // StateCandidate defines currentState of backup-restore as "Candidate". + StateCandidate = "Candidate" + // StateLeader defines currentState of backup-restore as "Leader". + StateLeader = "Leader" + // StateUnknown defines currentState of backup-restore as "UnknownState". + StateUnknown = "UnknownState" + + // DefaultCurrentState defines default currentState of backup-restore as "Follower". + DefaultCurrentState = StateFollower + + // NoLeaderID defines the state when etcd returns LeaderID as 0. + NoLeaderID uint64 = 0 + + // DefaultReelectionPeriod defines default ReelectionPeriod. + DefaultReelectionPeriod = 5 * time.Second + // DefaultEtcdConnectionTimeout defines default ConnectionTimeout for etcd client. + DefaultEtcdConnectionTimeout = 5 * time.Second +) + +// LeaderCallbacks are callbacks that are triggered to start/stop the snapshottter when leader's currentState changes. +type LeaderCallbacks struct { + // OnStartedLeading is called when a LeaderElector client starts leading. + OnStartedLeading func(context.Context) + // OnStoppedLeading is called when a LeaderElector client stops leading. + OnStoppedLeading func() +} + +// Config holds the LeaderElection config. +type Config struct { + // ReelectionPeriod defines the Period after which leadership status is checked. + ReelectionPeriod wrappers.Duration `json:"reelectionPeriod,omitempty"` + // EtcdConnectionTimeout defines the timeout duration for etcd client connection during leader election. + EtcdConnectionTimeout wrappers.Duration `json:"etcdConnectionTimeout,omitempty"` +} + +// LeaderElector holds the all configuration necessary to elect backup-restore Leader. +type LeaderElector struct { + // CurrentState defines currentState of backup-restore for LeaderElection. + CurrentState string + Config *Config + EtcdConnectionConfig *etcdutil.EtcdConnectionConfig + logger *logrus.Entry + Callbacks *LeaderCallbacks + ElectionLock *sync.Mutex +} diff --git a/pkg/server/backuprestoreserver.go b/pkg/server/backuprestoreserver.go index ab20a8c62..1ecc70830 100644 --- a/pkg/server/backuprestoreserver.go +++ b/pkg/server/backuprestoreserver.go @@ -18,10 +18,12 @@ import ( "context" "fmt" "net/http" + "sync" "sync/atomic" "time" "github.com/gardener/etcd-backup-restore/pkg/errors" + "github.com/gardener/etcd-backup-restore/pkg/leaderelection" "github.com/gardener/etcd-backup-restore/pkg/metrics" brtypes "github.com/gardener/etcd-backup-restore/pkg/types" "github.com/prometheus/client_golang/prometheus" @@ -61,13 +63,13 @@ func NewBackupRestoreServer(logger *logrus.Logger, config *BackupRestoreComponen func (b *BackupRestoreServer) Run(ctx context.Context) error { clusterURLsMap, err := types.NewURLsMap(b.config.RestorationConfig.InitialCluster) if err != nil { - // Ideally this case should not occur, since this check is done at the config validaitions. + // Ideally this case should not occur, since this check is done at the config validations. b.logger.Fatalf("failed creating url map for restore cluster: %v", err) } peerURLs, err := types.NewURLs(b.config.RestorationConfig.InitialAdvertisePeerURLs) if err != nil { - // Ideally this case should not occur, since this check is done at the config validaitions. + // Ideally this case should not occur, since this check is done at the config validations. b.logger.Fatalf("failed creating url map for restore cluster: %v", err) } @@ -102,6 +104,7 @@ func (b *BackupRestoreServer) startHTTPServer(initializer initializer.Initialize EnableTLS: (b.config.ServerConfig.TLSCertFile != "" && b.config.ServerConfig.TLSKeyFile != ""), ServerTLSCertFile: b.config.ServerConfig.TLSCertFile, ServerTLSKeyFile: b.config.ServerConfig.TLSKeyFile, + HTTPHandlerMutex: &sync.Mutex{}, } handler.SetStatus(http.StatusServiceUnavailable) b.logger.Info("Registering the http request handlers...") @@ -133,6 +136,7 @@ func (b *BackupRestoreServer) runServerWithoutSnapshotter(ctx context.Context, r // for the case where snapshotter is configured correctly func (b *BackupRestoreServer) runServerWithSnapshotter(ctx context.Context, restoreOpts *brtypes.RestoreOptions) error { ackCh := make(chan struct{}) + ssrStopCh := make(chan struct{}) etcdInitializer := initializer.NewInitializer(restoreOpts, b.config.SnapstoreConfig, b.logger.Logger) @@ -151,18 +155,46 @@ func (b *BackupRestoreServer) runServerWithSnapshotter(ctx context.Context, rest handler := b.startHTTPServer(etcdInitializer, ssr) defer handler.Stop() - ssrStopCh := make(chan struct{}) - go handleSsrStopRequest(ctx, handler, ssr, ackCh, ssrStopCh) + leaderCallbacks := &leaderelection.LeaderCallbacks{ + OnStartedLeading: func(leCtx context.Context) { + ssrStopCh = make(chan struct{}) + // Get the new snapshotter object + ssr, err = snapshotter.NewSnapshotter(b.logger, b.config.SnapshotterConfig, ss, b.config.EtcdConnectionConfig, b.config.CompressionConfig) + if err != nil { + b.logger.Errorf("Failed to create new Snapshotter object: %v", err) + return + } + go b.runEtcdProbeLoopWithSnapshotter(leCtx, handler, ssr, ssrStopCh, ackCh) + + go handleSsrStopRequest(leCtx, handler, ssr, ackCh, ssrStopCh) + }, + OnStoppedLeading: func() { + // stops the running snapshotter + ssr.SsrStateMutex.Lock() + defer ssr.SsrStateMutex.Unlock() + if ssr.SsrState == brtypes.SnapshotterActive { + ssrStopCh <- emptyStruct + b.logger.Info("backup-restore stops leading...") + } + handler.SetSnapshotterToNil() + }, + } + + b.logger.Infof("Creating leaderElector...") + le, err := leaderelection.NewLeaderElector(b.logger, b.config.EtcdConnectionConfig, b.config.LeaderElectionConfig, leaderCallbacks) + if err != nil { + return err + } + go handleAckState(handler, ackCh) go defragmentor.DefragDataPeriodically(ctx, b.config.EtcdConnectionConfig, b.defragmentationSchedule, ssr.TriggerFullSnapshot, b.logger) - b.runEtcdProbeLoopWithSnapshotter(ctx, handler, ssr, ssrStopCh, ackCh) - return nil + return le.Run(ctx) } // runEtcdProbeLoopWithSnapshotter runs the etcd probe loop -// for the case where snapshotter is configured correctly +// for the case when current backup-restore becomes leader backup-restore. func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Context, handler *HTTPHandler, ssr *snapshotter.Snapshotter, ssrStopCh chan struct{}, ackCh chan struct{}) { var ( err error @@ -211,6 +243,7 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex b.logger.Info("Snapshotter stopped.") ackCh <- emptyStruct handler.SetStatus(http.StatusServiceUnavailable) + b.logger.Info("###") b.logger.Info("Shutting down...") return } diff --git a/pkg/server/httpAPI.go b/pkg/server/httpAPI.go index 12e20445f..b24d82ea2 100644 --- a/pkg/server/httpAPI.go +++ b/pkg/server/httpAPI.go @@ -75,6 +75,7 @@ type HTTPHandler struct { EnableTLS bool ServerTLSCertFile string ServerTLSKeyFile string + HTTPHandlerMutex *sync.Mutex } // GetStatus returns the current status in the HTTPHandler @@ -90,6 +91,20 @@ func (h *HTTPHandler) SetStatus(status int) { h.status = status } +// SetSnapshotterToNil sets the current HTTPHandler.Snapshotter to Nil in the HTTPHandler. +func (h *HTTPHandler) SetSnapshotterToNil() { + h.HTTPHandlerMutex.Lock() + defer h.HTTPHandlerMutex.Unlock() + h.Snapshotter = nil +} + +// SetSnapshotter sets the current HTTPHandler.Snapshotter in the HTTPHandler. +func (h *HTTPHandler) SetSnapshotter(ssr *snapshotter.Snapshotter) { + h.HTTPHandlerMutex.Lock() + defer h.HTTPHandlerMutex.Unlock() + h.Snapshotter = ssr +} + // RegisterHandler registers the handler for different requests func (h *HTTPHandler) RegisterHandler() { mux := http.NewServeMux() diff --git a/pkg/server/init.go b/pkg/server/init.go index 75e26d843..eca69a469 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -20,6 +20,7 @@ import ( "github.com/gardener/etcd-backup-restore/pkg/compressor" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + "github.com/gardener/etcd-backup-restore/pkg/leaderelection" "github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter" brtypes "github.com/gardener/etcd-backup-restore/pkg/types" @@ -38,6 +39,7 @@ func NewBackupRestoreComponentConfig() *BackupRestoreComponentConfig { CompressionConfig: compressor.NewCompressorConfig(), RestorationConfig: brtypes.NewRestorationConfig(), DefragmentationSchedule: defaultDefragmentationSchedule, + LeaderElectionConfig: leaderelection.NewLeaderElectionConfig(), } } @@ -49,6 +51,7 @@ func (c *BackupRestoreComponentConfig) AddFlags(fs *flag.FlagSet) { c.SnapstoreConfig.AddFlags(fs) c.RestorationConfig.AddFlags(fs) c.CompressionConfig.AddFlags(fs) + c.LeaderElectionConfig.AddFlags(fs) // Miscellaneous fs.StringVar(&c.DefragmentationSchedule, "defragmentation-schedule", c.DefragmentationSchedule, "schedule to defragment etcd data directory") @@ -77,6 +80,9 @@ func (c *BackupRestoreComponentConfig) Validate() error { if _, err := cron.ParseStandard(c.DefragmentationSchedule); err != nil { return err } + if err := c.LeaderElectionConfig.Validate(); err != nil { + return err + } return nil } diff --git a/pkg/server/types.go b/pkg/server/types.go index 3d29b9c61..a7df82217 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -17,6 +17,7 @@ package server import ( "github.com/gardener/etcd-backup-restore/pkg/compressor" "github.com/gardener/etcd-backup-restore/pkg/etcdutil" + "github.com/gardener/etcd-backup-restore/pkg/leaderelection" brtypes "github.com/gardener/etcd-backup-restore/pkg/types" ) @@ -34,6 +35,7 @@ type BackupRestoreComponentConfig struct { CompressionConfig *compressor.CompressionConfig `json:"compressionConfig,omitempty"` RestorationConfig *brtypes.RestorationConfig `json:"restorationConfig,omitempty"` DefragmentationSchedule string `json:"defragmentationSchedule"` + LeaderElectionConfig *leaderelection.Config `json:"leaderElectionConfig,omitempty"` } // latestSnapshotMetadata holds snapshot details of latest full and delta snapshots diff --git a/pkg/snapshot/snapshotter/snapshotter.go b/pkg/snapshot/snapshotter/snapshotter.go index 1cf8c5be3..6a755ce2b 100644 --- a/pkg/snapshot/snapshotter/snapshotter.go +++ b/pkg/snapshot/snapshotter/snapshotter.go @@ -489,6 +489,7 @@ func (ssr *Snapshotter) CollectEventsSincePrevSnapshot(stopCh <-chan struct{}) ( } case <-stopCh: ssr.cleanupInMemoryEvents() + ssr.logger.Info("###") return true, nil } }