Skip to content

Commit

Permalink
Added the LeaderElection for backup-restore.
Browse files Browse the repository at this point in the history
Added support to allow only backup-restore Leader to take/uploads the snapshots.
  • Loading branch information
ishan16696 committed Jul 2, 2021
1 parent 43af3ca commit 5b4d937
Show file tree
Hide file tree
Showing 8 changed files with 332 additions and 27 deletions.
3 changes: 3 additions & 0 deletions example/00-backup-restore-server-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ defragmentationSchedule: "0 0 */3 * *"
compressionConfig:
enabled: true
policy: "gzip"

electionConfig:
reElectionPeriod: "5s"
37 changes: 37 additions & 0 deletions pkg/leaderelection/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leaderelection

import (
"github.com/gardener/etcd-backup-restore/pkg/wrappers"
flag "github.com/spf13/pflag"
)

// NewElectionConfig returns the ElectionConfig.
func NewElectionConfig() *ElectionConfig {
return &ElectionConfig{
ReElectionPeriod: wrappers.Duration{Duration: DefaultReElectionPeriod},
}
}

// AddFlags adds the flags to flagset.
func (e *ElectionConfig) AddFlags(fs *flag.FlagSet) {
fs.DurationVar(&e.ReElectionPeriod.Duration, "election-period", e.ReElectionPeriod.Duration, "Period after election will be re-triggered to check the leadership status.")
}

// Validate validates the ElectionConfig.
func (e *ElectionConfig) Validate() error {
return nil
}
138 changes: 138 additions & 0 deletions pkg/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leaderelection

import (
"context"
"fmt"
"time"

"github.com/gardener/etcd-backup-restore/pkg/errors"
"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
"github.com/sirupsen/logrus"
)

var emptyStruct struct{}

// NewLeaderElector returns LeaderElector configurations.
func NewLeaderElector(logger *logrus.Entry, etcdConnectionConfig *etcdutil.EtcdConnectionConfig, leaderElectionConfig *ElectionConfig, callbacks *LeaderCallbacks) (*LeaderElector, error) {
return &LeaderElector{
logger: logger.WithField("actor", "leader-elector"),
EtcdConnectionConfig: etcdConnectionConfig,
CurrentState: DefaultCurrentState,
StopCh: make(chan struct{}),
ProbeEtcdch: make(chan struct{}),
PrevElectionResult: ElectionLost,
Config: leaderElectionConfig,
Callbacks: callbacks,
}, nil
}

// StartElection starts the LeaderElection to Elect the backup-restore's Leader.
func (le *LeaderElector) StartElection(ctx context.Context) {
le.logger.Infof("Starting LeaderElection...")

for {
select {
case <-ctx.Done():
le.logger.Info("Shutting down LeaderElection...")
return
case <-le.StopCh:
le.logger.Info("Shutting down LeaderElection...")
return
case <-time.After(le.Config.ReElectionPeriod.Duration):
err := le.isLeader(ctx)
if err != nil {
le.CurrentState = StateUnknown
le.logger.Errorf("Failed to elect the backup-restore leader: %v", err)
}

if le.CurrentState == StateUnknown {
le.PrevElectionResult = ElectionLost
le.logger.Info("backup-restore is in: ", StateUnknown)
// stops the Running Snapshotter.
if le.Callbacks.OnStoppedLeading != nil {
le.Callbacks.OnStoppedLeading()
le.logger.Info("backup-restore lost the election")
le.logger.Info("waiting for Re-election...")
}
} else if le.PrevElectionResult == ElectionLost && le.CurrentState == StateLeader {
// It becomes the Leader backup-restore sidecar.
// Get the latest snapshotter object and start the snapshotter.
le.PrevElectionResult = ElectionWon
le.logger.Info("backup-restore become: ", StateLeader)
if le.Callbacks.OnStartedLeading != nil {
le.Callbacks.OnStartedLeading(ctx)
}
le.logger.Info("sending signal to ProbeEtcd to start the Snapshotter")
le.ProbeEtcdch <- emptyStruct
} else if le.PrevElectionResult == ElectionWon && le.CurrentState == StateLeader {
le.logger.Info("no change in leadershipStatus...")
} else if le.CurrentState == StateFollower {
le.logger.Info("************ I'm not the Leader *************** ")

le.PrevElectionResult = ElectionLost
// stop the Running Snapshotter
if le.Callbacks.OnStoppedLeading != nil {
le.Callbacks.OnStoppedLeading()
le.logger.Info("backup-restore lost the election")
le.logger.Info("backup-restore become: ", StateFollower)
}
}
}
}
}

// isLeader checks whether the current backup-restore is leader or not and sets the currentState of backup-restore accordingly.
func (le *LeaderElector) isLeader(ctx context.Context) error {
le.logger.Info("checking for leadershipStatus...")
var endPoint string
client, err := etcdutil.GetTLSClientForEtcd(le.EtcdConnectionConfig)
if err != nil {
return &errors.EtcdError{
Message: fmt.Sprintf("Failed to create etcd client: %v", err),
}
}
defer client.Close()

le.logger.Debugf("Etcd Endpoint: %v", le.EtcdConnectionConfig.Endpoints)

le.CurrentState = StateFollower

if len(le.EtcdConnectionConfig.Endpoints) > 0 {
endPoint = le.EtcdConnectionConfig.Endpoints[0]
} else {
return fmt.Errorf("Etcd endpoints are not passed correctly")
}
ctx, cancel := context.WithTimeout(ctx, le.EtcdConnectionConfig.ConnectionTimeout.Duration)
response, err := client.Status(ctx, endPoint)
cancel()
if err != nil {
le.logger.Errorf("Failed to get response header from etcd client: %v", err)
return err
}

le.logger.Infof("MemberID: %v", response.Header.MemberId)
le.logger.Infof("LeaderID: %v", response.Leader)

if response.Header.MemberId == response.Leader {
le.CurrentState = StateLeader
} else if response.Leader == QuorumLostID {
return &errors.EtcdError{
Message: fmt.Sprintf("Currently there is no Etcd-Leader present may be due to etcd quorum loss."),
}
}
return nil
}
77 changes: 77 additions & 0 deletions pkg/leaderelection/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leaderelection

import (
"context"
"time"

"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
"github.com/gardener/etcd-backup-restore/pkg/wrappers"
"github.com/sirupsen/logrus"
)

const (
// StateFollower defines currentState of backup-restore as "Follower".
StateFollower = "Follower"
// StateCandidate defines currentState of backup-restore as "Candidate".
StateCandidate = "Candidate"
// StateLeader defines currentState of backup-restore as "Leader".
StateLeader = "Leader"
// StateUnknown defines currentState of backup-restore as "UnknownState".
StateUnknown = "UnknownState"

// DefaultCurrentState defines default currentState of backup-restore as "Follower".
DefaultCurrentState = StateFollower

// ElectionLost defines the state of LeaderElection as ElectionLost.
ElectionLost = "Lost"
// ElectionWon defines the state of LeaderElection as ElectionWon.
ElectionWon = "Won"

// QuorumLostID defines the state when etcd returns LeaderID as 0.
QuorumLostID = 0

// DefaultReElectionPeriod defines default ReElectionPEriod.
DefaultReElectionPeriod = 5 * time.Second
)

// LeaderCallbacks are callbacks that are triggered when leader currentState changes to start/stop the snapshottter.
type LeaderCallbacks struct {
// OnStartedLeading is called when a LeaderElector client starts leading.
OnStartedLeading func(context.Context)
// OnStoppedLeading is called when a LeaderElector client stops leading.
OnStoppedLeading func()
}

// ElectionConfig holds the LeaderElection configurations.
type ElectionConfig struct {
// ReElectionPeriod defines the Period after which leadership status is checked.
ReElectionPeriod wrappers.Duration `json:"reElectionPeriod,omitempty"`
}

// LeaderElector holds the all configuration necessary to elect backup-restore Leader.
type LeaderElector struct {
// CurrentState defines currentState of backup-restore for LeaderElection.
CurrentState string
Config *ElectionConfig
EtcdConnectionConfig *etcdutil.EtcdConnectionConfig
logger *logrus.Entry
StopCh chan struct{}
ProbeEtcdch chan struct{}
PrevElectionResult string
Callbacks *LeaderCallbacks
//leaderElectionCancel context.CancelFunc
}
43 changes: 38 additions & 5 deletions pkg/server/backuprestoreserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/gardener/etcd-backup-restore/pkg/errors"
"github.com/gardener/etcd-backup-restore/pkg/leaderelection"
"github.com/gardener/etcd-backup-restore/pkg/metrics"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -61,13 +62,13 @@ func NewBackupRestoreServer(logger *logrus.Logger, config *BackupRestoreComponen
func (b *BackupRestoreServer) Run(ctx context.Context) error {
clusterURLsMap, err := types.NewURLsMap(b.config.RestorationConfig.InitialCluster)
if err != nil {
// Ideally this case should not occur, since this check is done at the config validaitions.
// Ideally this case should not occur, since this check is done at the config validations.
b.logger.Fatalf("failed creating url map for restore cluster: %v", err)
}

peerURLs, err := types.NewURLs(b.config.RestorationConfig.InitialAdvertisePeerURLs)
if err != nil {
// Ideally this case should not occur, since this check is done at the config validaitions.
// Ideally this case should not occur, since this check is done at the config validations.
b.logger.Fatalf("failed creating url map for restore cluster: %v", err)
}

Expand Down Expand Up @@ -133,6 +134,7 @@ func (b *BackupRestoreServer) runServerWithoutSnapshotter(ctx context.Context, r
// for the case where snapshotter is configured correctly
func (b *BackupRestoreServer) runServerWithSnapshotter(ctx context.Context, restoreOpts *brtypes.RestoreOptions) error {
ackCh := make(chan struct{})
ssrStopCh := make(chan struct{})

etcdInitializer := initializer.NewInitializer(restoreOpts, b.config.SnapstoreConfig, b.logger.Logger)

Expand All @@ -151,19 +153,47 @@ func (b *BackupRestoreServer) runServerWithSnapshotter(ctx context.Context, rest
handler := b.startHTTPServer(etcdInitializer, ssr)
defer handler.Stop()

ssrStopCh := make(chan struct{})
leaderCallbacks := &leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// Get the updated snapshotter state
err := ssr.GetLatestSnapshotterState()
if err != nil {
fmt.Printf("Unable to get Latest SnapshotterState: %v", err)
}
handler.Snapshotter = ssr
},
OnStoppedLeading: func() {
// stops the running snapshotter
ssr.SsrStateMutex.Lock()
if ssr.SsrState == brtypes.SnapshotterActive {
ssr.SsrStateMutex.Unlock()
ssrStopCh <- emptyStruct
} else {
ssr.SsrStateMutex.Unlock()
}
},
}

b.logger.Infof("Creating leaderElector...")
le, err := leaderelection.NewLeaderElector(b.logger, b.config.EtcdConnectionConfig, b.config.LeaderElectionConfig, leaderCallbacks)
if err != nil {
return err
}

go le.StartElection(ctx)

go handleSsrStopRequest(ctx, handler, ssr, ackCh, ssrStopCh)
go handleAckState(handler, ackCh)

go defragmentor.DefragDataPeriodically(ctx, b.config.EtcdConnectionConfig, b.defragmentationSchedule, ssr.TriggerFullSnapshot, b.logger)

b.runEtcdProbeLoopWithSnapshotter(ctx, handler, ssr, ssrStopCh, ackCh)
b.runEtcdProbeLoopWithSnapshotter(ctx, handler, ssr, ssrStopCh, ackCh, le)
return nil
}

// runEtcdProbeLoopWithSnapshotter runs the etcd probe loop
// for the case where snapshotter is configured correctly
func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Context, handler *HTTPHandler, ssr *snapshotter.Snapshotter, ssrStopCh chan struct{}, ackCh chan struct{}) {
func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Context, handler *HTTPHandler, ssr *snapshotter.Snapshotter, ssrStopCh chan struct{}, ackCh chan struct{}, le *leaderelection.LeaderElector) {
var (
err error
initialDeltaSnapshotTaken bool
Expand All @@ -177,6 +207,9 @@ func (b *BackupRestoreServer) runEtcdProbeLoopWithSnapshotter(ctx context.Contex
return
default:
err = b.probeEtcd(ctx)
if err == nil {
<-le.ProbeEtcdch
}
}
if err != nil {
b.logger.Errorf("Failed to probe etcd: %v", err)
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/gardener/etcd-backup-restore/pkg/compressor"
"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
"github.com/gardener/etcd-backup-restore/pkg/leaderelection"
"github.com/gardener/etcd-backup-restore/pkg/snapshot/snapshotter"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"

Expand All @@ -38,6 +39,7 @@ func NewBackupRestoreComponentConfig() *BackupRestoreComponentConfig {
CompressionConfig: compressor.NewCompressorConfig(),
RestorationConfig: brtypes.NewRestorationConfig(),
DefragmentationSchedule: defaultDefragmentationSchedule,
LeaderElectionConfig: leaderelection.NewElectionConfig(),
}
}

Expand All @@ -49,6 +51,7 @@ func (c *BackupRestoreComponentConfig) AddFlags(fs *flag.FlagSet) {
c.SnapstoreConfig.AddFlags(fs)
c.RestorationConfig.AddFlags(fs)
c.CompressionConfig.AddFlags(fs)
c.LeaderElectionConfig.AddFlags(fs)

// Miscellaneous
fs.StringVar(&c.DefragmentationSchedule, "defragmentation-schedule", c.DefragmentationSchedule, "schedule to defragment etcd data directory")
Expand Down Expand Up @@ -77,6 +80,9 @@ func (c *BackupRestoreComponentConfig) Validate() error {
if _, err := cron.ParseStandard(c.DefragmentationSchedule); err != nil {
return err
}
if err := c.LeaderElectionConfig.Validate(); err != nil {
return err
}
return nil
}

Expand Down
Loading

0 comments on commit 5b4d937

Please sign in to comment.