Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Leader election to backup-restore and allow only backup-restore Leader to take and uploads the snapshots. #353

Merged
merged 12 commits into from
Dec 31, 2021
Merged
4 changes: 4 additions & 0 deletions example/00-backup-restore-server-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ defragmentationSchedule: "0 0 */3 * *"
compressionConfig:
enabled: true
policy: "gzip"

leaderElectionConfig:
reelectionPeriod: "5s"
etcdConnectionTimeout: "5s"
50 changes: 50 additions & 0 deletions pkg/leaderelection/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leaderelection

import (
"fmt"
"time"

"github.com/gardener/etcd-backup-restore/pkg/wrappers"
flag "github.com/spf13/pflag"
)

// NewLeaderElectionConfig returns the Config.
func NewLeaderElectionConfig() *Config {
return &Config{
ReelectionPeriod: wrappers.Duration{Duration: DefaultReelectionPeriod},
EtcdConnectionTimeout: wrappers.Duration{Duration: DefaultEtcdConnectionTimeout},
}
}

// AddFlags adds the flags to flagset.
func (c *Config) AddFlags(fs *flag.FlagSet) {
fs.DurationVar(&c.EtcdConnectionTimeout.Duration, "etcd-connection-timeout-leader-election", c.EtcdConnectionTimeout.Duration, "etcd client connection timeout during leader election.")
fs.DurationVar(&c.ReelectionPeriod.Duration, "reelection-period", c.ReelectionPeriod.Duration, "Period after which election will be re-triggered to check the leadership status.")
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
}

// Validate validates the Config.
func (c *Config) Validate() error {
if c.ReelectionPeriod.Duration <= time.Duration(1*time.Second) {
return fmt.Errorf("ReelectionPeriod should be greater than 1 second")
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
}

if c.EtcdConnectionTimeout.Duration <= time.Duration(1*time.Second) {
return fmt.Errorf("etcd connection timeout in leaderElection should be greater than 1 second")
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}
192 changes: 192 additions & 0 deletions pkg/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leaderelection

import (
"context"
"fmt"
"sync"
"time"

"github.com/gardener/etcd-backup-restore/pkg/errors"
"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/sirupsen/logrus"
)

// NewLeaderElector returns LeaderElector configurations.
func NewLeaderElector(logger *logrus.Entry, etcdConnectionConfig *brtypes.EtcdConnectionConfig, leaderElectionConfig *Config, callbacks *LeaderCallbacks) (*LeaderElector, error) {
return &LeaderElector{
logger: logger.WithField("actor", "leader-elector"),
EtcdConnectionConfig: etcdConnectionConfig,
CurrentState: DefaultCurrentState,
Config: leaderElectionConfig,
Callbacks: callbacks,
ElectionLock: &sync.Mutex{},
}, nil
}

// Run starts the LeaderElection loop to elect the backup-restore's Leader
// and keep checking the leadership status of backup-restore.
func (le *LeaderElector) Run(ctx context.Context) error {
le.logger.Infof("Starting leaderElection...")
var leCtx context.Context
var leCancel context.CancelFunc
for {
select {
case <-ctx.Done():
le.logger.Info("Shutting down LeaderElection...")
leCancel()
return nil
case <-time.After(le.Config.ReelectionPeriod.Duration):
isLeader, err := le.IsLeader(ctx)
if err != nil {
le.logger.Errorf("Failed to elect the backup-restore leader: %v", err)

// set the CurrentState of backup-restore.
// stops the Running Snapshotter.
// wait for Reelection to happen.
le.CurrentState = StateUnknown
le.logger.Infof("backup-restore is in: %v", le.CurrentState)
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
if le.Callbacks.OnStoppedLeading != nil && leCtx != nil {
leCancel()
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
le.Callbacks.OnStoppedLeading()
}
le.logger.Info("waiting for Re-election...")
continue
}

if isLeader && (le.CurrentState == StateFollower || le.CurrentState == StateUnknown || le.CurrentState == StateCandidate) {
abdasgupta marked this conversation as resolved.
Show resolved Hide resolved
// backup-restore becomes the Leader backup-restore.
// set the CurrentState of backup-restore.
// update the snapshotter object with latest snapshotter object.
// start the snapshotter.
le.CurrentState = StateLeader
le.logger.Infof("backup-restore became: %v", le.CurrentState)

if le.Callbacks.OnStartedLeading != nil {
leCtx, leCancel = context.WithCancel(ctx)
le.logger.Info("backup-restore started leading...")
le.Callbacks.OnStartedLeading(leCtx)
}
} else if isLeader && le.CurrentState == StateLeader {
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
le.logger.Debug("no change in leadershipStatus...")
} else if !isLeader && le.CurrentState == StateLeader {
// backup-restore lost the election and becomes Follower.
// set the CurrentState of backup-restore.
// stop the Running snapshotter.
le.CurrentState = StateFollower
le.logger.Info("backup-restore lost the election")
le.logger.Infof("backup-restore became: %v", le.CurrentState)

if le.Callbacks.OnStoppedLeading != nil && leCtx != nil {
leCancel()
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
le.Callbacks.OnStoppedLeading()
}
} else if !isLeader && le.CurrentState == StateUnknown {
le.CurrentState = StateFollower
le.logger.Infof("backup-restore changed the state from %v to %v", StateUnknown, le.CurrentState)
} else if !isLeader && le.CurrentState == StateFollower {
le.logger.Infof("backup-restore currentState: %v", le.CurrentState)
}
}
}
}

// IsLeader checks whether the current backup-restore is leader or not.
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
func (le *LeaderElector) IsLeader(ctx context.Context) (bool, error) {
le.logger.Info("checking the leadershipStatus...")
var endPoint string
client, err := etcdutil.GetTLSClientForEtcd(le.EtcdConnectionConfig)
if err != nil {
return false, &errors.EtcdError{
Message: fmt.Sprintf("Failed to create etcd client: %v", err),
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
}
}
defer client.Close()

if len(le.EtcdConnectionConfig.Endpoints) > 0 {
endPoint = le.EtcdConnectionConfig.Endpoints[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the first endpoint always used here, is it guaranteed to be our sidecar's endpoint?

Copy link
Member Author

@ishan16696 ishan16696 Nov 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the endpoint of etcd which comes fom the etcdConnectionConfig, I think it should be passed through environment variable but I'm also not so sure how and which PR will take care of this. so this thing need to be discussed and taken care.

cc @abdasgupta @shreyas-s-rao @timuthy @aaronfern

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I know is that Endpoints may have different values like private IP, public IP, proxy IP. So, it's an array.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, endpoints of etcd comes as an array but actually Question was "How to make sure that endpoints present at 0th index in a array is always endpoint of their corresponding etcd?" @amshuman-kr also raised this question.
answer to this questions could be that etcd endpoints should passed as environment variable to their corresponding sidecar.

Copy link
Collaborator

@amshuman-kr amshuman-kr Nov 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing different config to different sidecars will be hard given that we have only one StatefulSet which creates all the pods of the ETCD cluster.

The member sidecars need to still distinguish between themselves and this can be achieved by passing the pod name (which would also be the ETCD member name according to the proposal) as an environment variable using fieldRef mechanism.

The code here should ideally either iterate through the member names and select the correct endpoint of the member by comparing with the pod/member name passed via env variable or it should rely on some convention in configuration where the endpoint can be computed based on the passed pod/member name.

} else {
return false, fmt.Errorf("Etcd endpoints are not passed correctly")
}

ctx, cancel := context.WithTimeout(ctx, le.Config.EtcdConnectionTimeout.Duration)
defer cancel()

response, err := client.Status(ctx, endPoint)
if err != nil {
le.logger.Errorf("Failed to get status of etcd endPoint: %v with error: %v", le.EtcdConnectionConfig.Endpoints[0], err)
return false, err
}

if response.Header.MemberId == response.Leader {
return true, nil
} else if response.Leader == NoLeaderID {
return false, &errors.EtcdError{
Message: fmt.Sprintf("Currently there is no Etcd Leader present may be due to etcd quorum loss or election is being held."),
}
}
return false, nil
}

// GetLeader will return the LeaderID as well as PeerURLs of etcd leader.
func (le *LeaderElector) GetLeader(ctx context.Context) (uint64, []string, error) {
le.logger.Info("getting the etcd leaderID...")
var endPoint string
client, err := etcdutil.GetTLSClientForEtcd(le.EtcdConnectionConfig)
if err != nil {
return NoLeaderID, nil, &errors.EtcdError{
Message: fmt.Sprintf("Failed to create etcd client: %v", err),
}
}
defer client.Close()

if len(le.EtcdConnectionConfig.Endpoints) > 0 {
endPoint = le.EtcdConnectionConfig.Endpoints[0]
} else {
return NoLeaderID, nil, &errors.EtcdError{
Message: fmt.Sprintf("Etcd endpoints are not passed correctly"),
}
}

ctx, cancel := context.WithTimeout(ctx, le.Config.EtcdConnectionTimeout.Duration)
defer cancel()

response, err := client.Status(ctx, endPoint)
if err != nil {
le.logger.Errorf("Failed to get status of etcd endPoint: %v with error: %v", le.EtcdConnectionConfig.Endpoints[0], err)
return NoLeaderID, nil, err
}

if response.Leader == NoLeaderID {
return NoLeaderID, nil, &errors.EtcdError{
Message: fmt.Sprintf("Currently there is no Etcd Leader present may be due to etcd quorum loss."),
}
}

membersInfo, err := client.MemberList(ctx)
if err != nil {
le.logger.Errorf("Failed to get memberList of etcd with error: %v", err)
return response.Leader, nil, err
}

for _, member := range membersInfo.Members {
if response.Leader == member.GetID() {
return response.Leader, member.GetPeerURLs(), nil
}
}
return response.Leader, nil, nil
}
74 changes: 74 additions & 0 deletions pkg/leaderelection/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leaderelection

import (
"context"
"sync"
"time"

brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/gardener/etcd-backup-restore/pkg/wrappers"
"github.com/sirupsen/logrus"
)

const (
// StateFollower defines currentState of backup-restore as "Follower".
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
StateFollower = "Follower"
// StateCandidate defines currentState of backup-restore as "Candidate".
StateCandidate = "Candidate"
// StateLeader defines currentState of backup-restore as "Leader".
StateLeader = "Leader"
// StateUnknown defines currentState of backup-restore as "UnknownState".
StateUnknown = "UnknownState"

// DefaultCurrentState defines default currentState of backup-restore as "Follower".
DefaultCurrentState = StateFollower

// NoLeaderID defines the state when etcd returns LeaderID as 0.
NoLeaderID uint64 = 0

// DefaultReelectionPeriod defines default ReelectionPeriod.
DefaultReelectionPeriod = 5 * time.Second
// DefaultEtcdConnectionTimeout defines default ConnectionTimeout for etcd client.
DefaultEtcdConnectionTimeout = 5 * time.Second
)

// LeaderCallbacks are callbacks that are triggered to start/stop the snapshottter when leader's currentState changes.
type LeaderCallbacks struct {
// OnStartedLeading is called when a LeaderElector client starts leading.
OnStartedLeading func(context.Context)
// OnStoppedLeading is called when a LeaderElector client stops leading.
OnStoppedLeading func()
}

// Config holds the LeaderElection config.
type Config struct {
// ReelectionPeriod defines the Period after which leadership status is checked.
ReelectionPeriod wrappers.Duration `json:"reelectionPeriod,omitempty"`
// EtcdConnectionTimeout defines the timeout duration for etcd client connection during leader election.
EtcdConnectionTimeout wrappers.Duration `json:"etcdConnectionTimeout,omitempty"`
}

// LeaderElector holds the all configuration necessary to elect backup-restore Leader.
type LeaderElector struct {
// CurrentState defines currentState of backup-restore for LeaderElection.
CurrentState string
Config *Config
EtcdConnectionConfig *brtypes.EtcdConnectionConfig
logger *logrus.Entry
Callbacks *LeaderCallbacks
ElectionLock *sync.Mutex
}
Loading