Skip to content

Commit

Permalink
Added the LeaderElection for backup-restore.
Browse files Browse the repository at this point in the history
Added support to allow only backup-restore Leader to take/uploads the snapshots.
  • Loading branch information
ishan16696 committed Jul 14, 2021
1 parent 43af3ca commit 5cae968
Show file tree
Hide file tree
Showing 10 changed files with 384 additions and 8 deletions.
4 changes: 4 additions & 0 deletions example/00-backup-restore-server-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ defragmentationSchedule: "0 0 */3 * *"
compressionConfig:
enabled: true
policy: "gzip"

leaderElectionConfig:
reelectionPeriod: "5s"
etcdConnectionTimeout: "5s"
2 changes: 1 addition & 1 deletion pkg/defragmentor/defrag.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/sirupsen/logrus"
)

// CallbackFunc is type decalration for callback function for defragmentor
// CallbackFunc is type declaration for callback function for defragmentor
type CallbackFunc func(ctx context.Context) (*brtypes.Snapshot, error)

// defragmentorJob implement the cron.Job for etcd defragmentation.
Expand Down
50 changes: 50 additions & 0 deletions pkg/leaderelection/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leaderelection

import (
"fmt"
"time"

"github.com/gardener/etcd-backup-restore/pkg/wrappers"
flag "github.com/spf13/pflag"
)

// NewLeaderElectionConfig returns the Config.
func NewLeaderElectionConfig() *Config {
return &Config{
ReelectionPeriod: wrappers.Duration{Duration: DefaultReelectionPeriod},
EtcdConnectionTimeout: wrappers.Duration{Duration: DefaultEtcdConnectionTimeout},
}
}

// AddFlags adds the flags to flagset.
func (c *Config) AddFlags(fs *flag.FlagSet) {
fs.DurationVar(&c.EtcdConnectionTimeout.Duration, "etcd-connection-timeout-leader-election", c.EtcdConnectionTimeout.Duration, "etcd client connection timeout during leader election.")
fs.DurationVar(&c.ReelectionPeriod.Duration, "reelection-period", c.ReelectionPeriod.Duration, "Period after which election will be re-triggered to check the leadership status.")
}

// Validate validates the Config.
func (c *Config) Validate() error {
if c.ReelectionPeriod.Duration <= time.Duration(1*time.Second) {
return fmt.Errorf("ReelectionPeriod should be greater than 1 second")
}

if c.EtcdConnectionTimeout.Duration <= time.Duration(1*time.Second) {
return fmt.Errorf("etcd connection timeout in leaderElection should be greater than 1 second")
}

return nil
}
191 changes: 191 additions & 0 deletions pkg/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leaderelection

import (
"context"
"fmt"
"sync"
"time"

"github.com/gardener/etcd-backup-restore/pkg/errors"
"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
"github.com/sirupsen/logrus"
)

// NewLeaderElector returns LeaderElector configurations.
func NewLeaderElector(logger *logrus.Entry, etcdConnectionConfig *etcdutil.EtcdConnectionConfig, leaderElectionConfig *Config, callbacks *LeaderCallbacks) (*LeaderElector, error) {
return &LeaderElector{
logger: logger.WithField("actor", "leader-elector"),
EtcdConnectionConfig: etcdConnectionConfig,
CurrentState: DefaultCurrentState,
Config: leaderElectionConfig,
Callbacks: callbacks,
ElectionLock: &sync.Mutex{},
}, nil
}

// Run starts the LeaderElection loop to elect the backup-restore's Leader
// and keep checking the leadership status of backup-restore.
func (le *LeaderElector) Run(ctx context.Context) error {
le.logger.Infof("Starting leaderElection...")
var leCtx context.Context
var leCancel context.CancelFunc
for {
select {
case <-ctx.Done():
le.logger.Info("Shutting down LeaderElection...")
leCancel()
return nil
case <-time.After(le.Config.ReelectionPeriod.Duration):
isLeader, err := le.IsLeader(ctx)
if err != nil {
le.logger.Errorf("Failed to elect the backup-restore leader: %v", err)

// set the CurrentState of backup-restore.
// stops the Running Snapshotter.
// wait for Reelection to happen.
le.CurrentState = StateUnknown
le.logger.Infof("backup-restore is in: %v", le.CurrentState)
if le.Callbacks.OnStoppedLeading != nil && leCtx != nil {
leCancel()
le.Callbacks.OnStoppedLeading()
}
le.logger.Info("waiting for Re-election...")
continue
}

if isLeader && (le.CurrentState == StateFollower || le.CurrentState == StateUnknown || le.CurrentState == StateCandidate) {
// backup-restore becomes the Leader backup-restore.
// set the CurrentState of backup-restore.
// update the snapshotter object with latest snapshotter object.
// start the snapshotter.
le.CurrentState = StateLeader
le.logger.Infof("backup-restore became: %v", le.CurrentState)

if le.Callbacks.OnStartedLeading != nil {
leCtx, leCancel = context.WithCancel(ctx)
le.logger.Info("backup-restore started leading...")
le.Callbacks.OnStartedLeading(leCtx)
}
} else if isLeader && le.CurrentState == StateLeader {
le.logger.Debug("no change in leadershipStatus...")
} else if !isLeader && le.CurrentState == StateLeader {
// backup-restore lost the election and becomes Follower.
// set the CurrentState of backup-restore.
// stop the Running snapshotter.
le.CurrentState = StateFollower
le.logger.Info("backup-restore lost the election")
le.logger.Infof("backup-restore became: %v", le.CurrentState)

if le.Callbacks.OnStoppedLeading != nil && leCtx != nil {
leCancel()
le.Callbacks.OnStoppedLeading()
}
} else if !isLeader && le.CurrentState == StateUnknown {
le.CurrentState = StateFollower
le.logger.Infof("backup-restore changed the state from %v to %v", StateUnknown, le.CurrentState)
} else if !isLeader && le.CurrentState == StateFollower {
le.logger.Infof("backup-restore currentState: %v", le.CurrentState)
}
}
}
}

// IsLeader checks whether the current backup-restore is leader or not.
func (le *LeaderElector) IsLeader(ctx context.Context) (bool, error) {
le.logger.Info("checking the leadershipStatus...")
var endPoint string
client, err := etcdutil.GetTLSClientForEtcd(le.EtcdConnectionConfig)
if err != nil {
return false, &errors.EtcdError{
Message: fmt.Sprintf("Failed to create etcd client: %v", err),
}
}
defer client.Close()

if len(le.EtcdConnectionConfig.Endpoints) > 0 {
endPoint = le.EtcdConnectionConfig.Endpoints[0]
} else {
return false, fmt.Errorf("Etcd endpoints are not passed correctly")
}

ctx, cancel := context.WithTimeout(ctx, le.Config.EtcdConnectionTimeout.Duration)
defer cancel()

response, err := client.Status(ctx, endPoint)
if err != nil {
le.logger.Errorf("Failed to get status of etcd endPoint: %v with error: %v", le.EtcdConnectionConfig.Endpoints[0], err)
return false, err
}

if response.Header.MemberId == response.Leader {
return true, nil
} else if response.Leader == NoLeaderID {
return false, &errors.EtcdError{
Message: fmt.Sprintf("Currently there is no Etcd Leader present may be due to etcd quorum loss or election is being held."),
}
}
return false, nil
}

// GetLeader will return the LeaderID as well as PeerURLs of etcd leader.
func (le *LeaderElector) GetLeader(ctx context.Context) (uint64, []string, error) {
le.logger.Info("getting the etcd leaderID...")
var endPoint string
client, err := etcdutil.GetTLSClientForEtcd(le.EtcdConnectionConfig)
if err != nil {
return NoLeaderID, nil, &errors.EtcdError{
Message: fmt.Sprintf("Failed to create etcd client: %v", err),
}
}
defer client.Close()

if len(le.EtcdConnectionConfig.Endpoints) > 0 {
endPoint = le.EtcdConnectionConfig.Endpoints[0]
} else {
return NoLeaderID, nil, &errors.EtcdError{
Message: fmt.Sprintf("Etcd endpoints are not passed correctly"),
}
}

ctx, cancel := context.WithTimeout(ctx, le.Config.EtcdConnectionTimeout.Duration)
defer cancel()

response, err := client.Status(ctx, endPoint)
if err != nil {
le.logger.Errorf("Failed to get status of etcd endPoint: %v with error: %v", le.EtcdConnectionConfig.Endpoints[0], err)
return NoLeaderID, nil, err
}

if response.Leader == NoLeaderID {
return NoLeaderID, nil, &errors.EtcdError{
Message: fmt.Sprintf("Currently there is no Etcd Leader present may be due to etcd quorum loss."),
}
}

membersInfo, err := client.MemberList(ctx)
if err != nil {
le.logger.Errorf("Failed to get memberList of etcd with error: %v", err)
return response.Leader, nil, err
}

for _, member := range membersInfo.Members {
if response.Leader == member.GetID() {
return response.Leader, member.GetPeerURLs(), nil
}
}
return response.Leader, nil, nil
}
74 changes: 74 additions & 0 deletions pkg/leaderelection/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leaderelection

import (
"context"
"sync"
"time"

"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
"github.com/gardener/etcd-backup-restore/pkg/wrappers"
"github.com/sirupsen/logrus"
)

const (
// StateFollower defines currentState of backup-restore as "Follower".
StateFollower = "Follower"
// StateCandidate defines currentState of backup-restore as "Candidate".
StateCandidate = "Candidate"
// StateLeader defines currentState of backup-restore as "Leader".
StateLeader = "Leader"
// StateUnknown defines currentState of backup-restore as "UnknownState".
StateUnknown = "UnknownState"

// DefaultCurrentState defines default currentState of backup-restore as "Follower".
DefaultCurrentState = StateFollower

// NoLeaderID defines the state when etcd returns LeaderID as 0.
NoLeaderID uint64 = 0

// DefaultReelectionPeriod defines default ReelectionPeriod.
DefaultReelectionPeriod = 5 * time.Second
// DefaultEtcdConnectionTimeout defines default ConnectionTimeout for etcd client.
DefaultEtcdConnectionTimeout = 5 * time.Second
)

// LeaderCallbacks are callbacks that are triggered to start/stop the snapshottter when leader's currentState changes.
type LeaderCallbacks struct {
// OnStartedLeading is called when a LeaderElector client starts leading.
OnStartedLeading func(context.Context)
// OnStoppedLeading is called when a LeaderElector client stops leading.
OnStoppedLeading func()
}

// Config holds the LeaderElection config.
type Config struct {
// ReelectionPeriod defines the Period after which leadership status is checked.
ReelectionPeriod wrappers.Duration `json:"reelectionPeriod,omitempty"`
// EtcdConnectionTimeout defines the timeout duration for etcd client connection during leader election.
EtcdConnectionTimeout wrappers.Duration `json:"etcdConnectionTimeout,omitempty"`
}

// LeaderElector holds the all configuration necessary to elect backup-restore Leader.
type LeaderElector struct {
// CurrentState defines currentState of backup-restore for LeaderElection.
CurrentState string
Config *Config
EtcdConnectionConfig *etcdutil.EtcdConnectionConfig
logger *logrus.Entry
Callbacks *LeaderCallbacks
ElectionLock *sync.Mutex
}
Loading

0 comments on commit 5cae968

Please sign in to comment.