Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Leader election to backup-restore and allow only backup-restore Leader to take and uploads the snapshots. #353

Merged
merged 12 commits into from
Dec 31, 2021
Merged
11 changes: 11 additions & 0 deletions example/00-backup-restore-server-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,14 @@ defragmentationSchedule: "0 0 */3 * *"
compressionConfig:
enabled: true
policy: "gzip"

leaderElectionConfig:
reelectionPeriod: "5s"
etcdConnectionTimeout: "5s"

healthConfig:
snapshotLeaseRenewalEnabled: false
memberLeaseRenewalEnabled: false
heartbeatDuration: "30s"
fullSnapshotLeaseName: "full-snapshot-revisions"
ishan16696 marked this conversation as resolved.
Show resolved Hide resolved
deltaSnapshotLeaseName: "delta-snapshot-revisions"
8 changes: 7 additions & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,14 @@ func (cp *Compactor) Compact(opts *brtypes.CompactOptions) (*brtypes.Snapshot, e

// Then defrag the ETCD
if opts.NeedDefragmentation {
client, err := clientFactory.NewCluster()
if err != nil {
return nil, fmt.Errorf("failed to build etcd cluster client")
}
defer client.Close()

defragCtx, defragCancel := context.WithTimeout(ctx, opts.DefragTimeout.Duration)
err := etcdutil.DefragmentData(defragCtx, clientMaintenance, ep, cp.logger)
err = etcdutil.DefragmentData(defragCtx, clientMaintenance, client, ep, cp.logger)
defragCancel()
if err != nil {
cp.logger.Errorf("failed to defragment: %v", err)
Expand Down
6 changes: 5 additions & 1 deletion pkg/compactor/compactor_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
"go.etcd.io/etcd/embed"
)

const (
embeddedEtcdPortNo = "9089"
)

var (
testSuitDir, testEtcdDir, testSnapshotDir string
testCtx = context.Background()
Expand Down Expand Up @@ -65,7 +69,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
logger.Infof("Snapshot Directory is: %s", testSnapshotDir)

// Start the main ETCD process that will run untill all compaction test cases are run
etcd, err = utils.StartEmbeddedEtcd(testCtx, testEtcdDir, logger)
etcd, err = utils.StartEmbeddedEtcd(testCtx, testEtcdDir, logger, embeddedEtcdPortNo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a port number to supply? The function starts ETCD at random port. You can fetch port number from etcd.Clients[0].Addr().String() of the return value

Copy link
Member Author

@ishan16696 ishan16696 Nov 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue which I faced is that StartEmbeddedEtcd starting at random port but AFAIK that port is not exposed to fetch the endpoints correctly. Although compact pkg unit test will get passed because they are not throwing an error if defragmentation fails but you look closely you can find the logs.

time="2021-11-04T18:03:39+05:30" level=info msg="Starting the defragmentation on etcd leader" suite=compactor
time="2021-11-04T18:03:39+05:30" level=info msg="Defragmenting etcd member[http://localhost:0]" suite=compactor
.
.
.
time="2021-11-04T18:02:24+05:30" level=warning msg="Failed to get status of etcd member[http://localhost:0] with error: context deadline exceeded" suite=compactor

And I got this error while running defragmentor pkg unit tests:

{"level":"warn","ts":"2021-11-04T17:22:12.796+0530","caller":"grpclog/grpclog.go:60","msg":"grpc: addrConn.createTransport failed to connect to {http://localhost:0  <nil> 0 <nil>}. Err :connection error: desc = \"transport: Error while dialing dial tcp [::1]:0: connect: can't assign requested address\". Reconnecting..."}

Amshu suggested me to run StartEmbeddedEtcd on particular port, I did that but then compact pkg throw an error as it is using to 2 embedded-etcd during testing, then I thought better to make portNo. of StartEmbeddedEtcd configurable.

Expect(err).ShouldNot(HaveOccurred())
endpoints = []string{etcd.Clients[0].Addr().String()}
logger.Infof("endpoints: %s", endpoints)
Expand Down
59 changes: 48 additions & 11 deletions pkg/defragmentor/defrag.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package defragmentor

import (
"context"
"time"

"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
"github.com/gardener/etcd-backup-restore/pkg/miscellaneous"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"

cron "github.com/robfig/cron/v3"
Expand Down Expand Up @@ -47,24 +49,58 @@ func NewDefragmentorJob(ctx context.Context, etcdConnectionConfig *brtypes.EtcdC

func (d *defragmentorJob) Run() {
clientFactory := etcdutil.NewFactory(*d.etcdConnectionConfig)
client, err := clientFactory.NewMaintenance()

clientMaintenance, err := clientFactory.NewMaintenance()
if err != nil {
d.logger.Warnf("failed to create etcd maintenance client for defragmentation")
d.logger.Warnf("failed to create etcd maintenance client")
}
defer client.Close()
defer clientMaintenance.Close()

defragCtx, defragCancel := context.WithTimeout(d.ctx, d.etcdConnectionConfig.DefragTimeout.Duration)
err = etcdutil.DefragmentData(defragCtx, client, d.etcdConnectionConfig.Endpoints, d.logger)
defragCancel()
client, err := clientFactory.NewCluster()
if err != nil {
d.logger.Warnf("Failed to defrag data with error: %v", err)
} else {
if d.callback != nil {
if _, err = d.callback(d.ctx, false); err != nil {
d.logger.Warnf("defragmentation callback failed with error: %v", err)
d.logger.Warnf("failed to create etcd cluster client")
}
defer client.Close()

ticker := time.NewTicker(etcdutil.DefragRetryPeriod)
defer ticker.Stop()

waitLoop:
for {
select {
case <-d.ctx.Done():
return
case <-ticker.C:
etcdEndpoints, err := miscellaneous.GetAllEtcdEndpoints(d.ctx, client, d.etcdConnectionConfig, d.logger)
if err != nil {
d.logger.Errorf("failed to get endpoints of all members of etcd cluster: %v", err)
continue
}
d.logger.Infof("All etcd members endPoints: %v", etcdEndpoints)

isClusterHealthy, err := miscellaneous.IsEtcdClusterHealthy(d.ctx, clientMaintenance, d.etcdConnectionConfig, etcdEndpoints, d.logger)
if err != nil {
d.logger.Errorf("failed to defrag as all members of etcd cluster are not healthy: %v", err)
continue
}

if isClusterHealthy {
d.logger.Infof("Starting the defragmentation as all members of etcd cluster are in healthy state")
err = etcdutil.DefragmentData(d.ctx, clientMaintenance, client, etcdEndpoints, d.logger)
if err != nil {
d.logger.Warnf("failed to defrag data with error: %v", err)
} else {
if d.callback != nil {
if _, err = d.callback(d.ctx, false); err != nil {
d.logger.Warnf("defragmentation callback failed with error: %v", err)
}
}
break waitLoop
}
}
}
}

}

// DefragDataPeriodically defragments the data directory of each etcd member.
Expand All @@ -77,6 +113,7 @@ func DefragDataPeriodically(ctx context.Context, etcdConnectionConfig *brtypes.E
jobRunner.Start()

<-ctx.Done()
logger.Info("Closing defragmentor.")
jobRunnerCtx := jobRunner.Stop()
<-jobRunnerCtx.Done()
}
177 changes: 173 additions & 4 deletions pkg/defragmentor/defrag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,29 @@ import (
"time"

"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
mockfactory "github.com/gardener/etcd-backup-restore/pkg/mock/etcdutil/client"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/gardener/etcd-backup-restore/test/utils"
"github.com/golang/mock/gomock"
cron "github.com/robfig/cron/v3"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/etcdserverpb"

. "github.com/gardener/etcd-backup-restore/pkg/defragmentor"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
cron "github.com/robfig/cron/v3"
)

var _ = Describe("Defrag", func() {
var (
etcdConnectionConfig *brtypes.EtcdConnectionConfig
keyPrefix = "/defrag/key-"
valuePrefix = "val"

ctrl *gomock.Controller
factory *mockfactory.MockFactory
cm *mockfactory.MockMaintenanceCloser
cl *mockfactory.MockClusterCloser
)

BeforeEach(func() {
Expand All @@ -43,6 +52,165 @@ var _ = Describe("Defrag", func() {
etcdConnectionConfig.ConnectionTimeout.Duration = 30 * time.Second
etcdConnectionConfig.SnapshotTimeout.Duration = 30 * time.Second
etcdConnectionConfig.DefragTimeout.Duration = 30 * time.Second

ctrl = gomock.NewController(GinkgoT())
factory = mockfactory.NewMockFactory(ctrl)
cm = mockfactory.NewMockMaintenanceCloser(ctrl)
cl = mockfactory.NewMockClusterCloser(ctrl)
})

Describe("With Etcd cluster", func() {
var (
dummyID = uint64(1111)
dummyClientEndpoints = []string{"http://127.0.0.1:2379", "http://127.0.0.1:9090"}
)
BeforeEach(func() {
factory.EXPECT().NewMaintenance().Return(cm, nil)
factory.EXPECT().NewCluster().Return(cl, nil)
})

Context("MemberList API call fails", func() {
It("should return error", func() {
clientMaintenance, err := factory.NewMaintenance()
Expect(err).ShouldNot(HaveOccurred())

client, err := factory.NewCluster()
Expect(err).ShouldNot(HaveOccurred())

cl.EXPECT().MemberList(gomock.Any()).Return(nil, fmt.Errorf("failed to connect with the dummy etcd")).AnyTimes()

leaderEtcdEndpoints, followerEtcdEndpoints, err := etcdutil.GetEtcdEndPointsSorted(testCtx, clientMaintenance, client, dummyClientEndpoints, logger)
Expect(err).Should(HaveOccurred())
Expect(leaderEtcdEndpoints).Should(BeNil())
Expect(followerEtcdEndpoints).Should(BeNil())

err = etcdutil.DefragmentData(testCtx, clientMaintenance, client, dummyClientEndpoints, logger)
Expect(err).Should(HaveOccurred())
})
})

Context("MemberList API call succeeds and Status API call fails", func() {
It("should return error", func() {
clientMaintenance, err := factory.NewMaintenance()
Expect(err).ShouldNot(HaveOccurred())

client, err := factory.NewCluster()
Expect(err).ShouldNot(HaveOccurred())

cl.EXPECT().MemberList(gomock.Any()).DoAndReturn(func(_ context.Context) (*clientv3.MemberListResponse, error) {
response := new(clientv3.MemberListResponse)
dummyMember1 := &etcdserverpb.Member{
ID: dummyID,
ClientURLs: []string{dummyClientEndpoints[0]},
}
dummyMember2 := &etcdserverpb.Member{
ID: dummyID + 1,
ClientURLs: []string{dummyClientEndpoints[1]},
}
response.Members = []*etcdserverpb.Member{dummyMember1, dummyMember2}
return response, nil
}).AnyTimes()

cm.EXPECT().Status(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("failed to connect to the dummy etcd")).AnyTimes()

leaderEtcdEndpoints, followerEtcdEndpoints, err := etcdutil.GetEtcdEndPointsSorted(testCtx, clientMaintenance, client, dummyClientEndpoints, logger)
Expect(err).Should(HaveOccurred())
Expect(leaderEtcdEndpoints).Should(BeNil())
Expect(followerEtcdEndpoints).Should(BeNil())

err = etcdutil.DefragmentData(testCtx, clientMaintenance, client, dummyClientEndpoints, logger)
Expect(err).Should(HaveOccurred())
})
})

Context("Only Defragment API call fails", func() {
It("should return error and fail to perform defragmentation", func() {
clientMaintenance, err := factory.NewMaintenance()
Expect(err).ShouldNot(HaveOccurred())

client, err := factory.NewCluster()
Expect(err).ShouldNot(HaveOccurred())

cl.EXPECT().MemberList(gomock.Any()).DoAndReturn(func(_ context.Context) (*clientv3.MemberListResponse, error) {
response := new(clientv3.MemberListResponse)
// dummy etcd cluster leader
dummyMember1 := &etcdserverpb.Member{
ID: dummyID,
ClientURLs: []string{dummyClientEndpoints[0]},
}
// dummy etcd cluster follower
dummyMember2 := &etcdserverpb.Member{
ID: dummyID + 1,
ClientURLs: []string{dummyClientEndpoints[1]},
}
response.Members = []*etcdserverpb.Member{dummyMember1, dummyMember2}
return response, nil
}).AnyTimes()

cm.EXPECT().Status(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ string) (*clientv3.StatusResponse, error) {
response := new(clientv3.StatusResponse)
response.Leader = dummyID
return response, nil
}).Times(2)

cm.EXPECT().Status(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ string) (*clientv3.StatusResponse, error) {
response := new(clientv3.StatusResponse)
response.DbSize = 1
return response, nil
}).Times(1)

cm.EXPECT().Defragment(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("failed to defrag the etcd")).AnyTimes()

leaderEtcdEndpoints, followerEtcdEndpoints, err := etcdutil.GetEtcdEndPointsSorted(testCtx, clientMaintenance, client, dummyClientEndpoints, logger)
Expect(err).ShouldNot(HaveOccurred())
Expect(leaderEtcdEndpoints).Should(Equal([]string{dummyClientEndpoints[0]}))
Expect(followerEtcdEndpoints).Should(Equal([]string{dummyClientEndpoints[1]}))

err = etcdutil.DefragmentData(testCtx, clientMaintenance, client, dummyClientEndpoints, logger)
Expect(err).Should(HaveOccurred())
})
})

Context("All etcd client API call succeeds", func() {
It("should able to perform the defragmentation on etcd follower as well as on etcd leader", func() {
clientMaintenance, err := factory.NewMaintenance()
Expect(err).ShouldNot(HaveOccurred())

client, err := factory.NewCluster()
Expect(err).ShouldNot(HaveOccurred())

cl.EXPECT().MemberList(gomock.Any()).DoAndReturn(func(_ context.Context) (*clientv3.MemberListResponse, error) {
response := new(clientv3.MemberListResponse)
// etcd cluster leader
dummyMember1 := &etcdserverpb.Member{
ID: dummyID,
ClientURLs: []string{dummyClientEndpoints[0]},
}
// etcd cluster follower
dummyMember2 := &etcdserverpb.Member{
ID: dummyID + 1,
ClientURLs: []string{dummyClientEndpoints[1]},
}
response.Members = []*etcdserverpb.Member{dummyMember1, dummyMember2}
return response, nil
}).AnyTimes()

cm.EXPECT().Status(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ string) (*clientv3.StatusResponse, error) {
response := new(clientv3.StatusResponse)
response.Leader = dummyID
response.DbSize = 10
return response, nil
}).AnyTimes()

cm.EXPECT().Defragment(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ string) (*clientv3.DefragmentResponse, error) {
response := new(clientv3.DefragmentResponse)
return response, nil
}).AnyTimes()

err = etcdutil.DefragmentData(testCtx, clientMaintenance, client, dummyClientEndpoints, logger)
Expect(err).ShouldNot(HaveOccurred())
})
})
})

Context("Defragmentation", func() {
Expand Down Expand Up @@ -108,13 +276,14 @@ var _ = Describe("Defrag", func() {

ctx, cancel := context.WithTimeout(testCtx, etcdDialTimeout)
oldStatus, err := clientMaintenance.Status(ctx, endpoints[0])
cancel()

Expect(err).ShouldNot(HaveOccurred())
oldDBSize := oldStatus.DbSize
oldRevision := oldStatus.Header.GetRevision()

defragmentorJob := NewDefragmentorJob(testCtx, etcdConnectionConfig, logger, nil)
defragmentorJob := NewDefragmentorJob(ctx, etcdConnectionConfig, logger, nil)
defragmentorJob.Run()
cancel()

ctx, cancel = context.WithTimeout(testCtx, etcdDialTimeout)
newStatus, err := clientMaintenance.Status(ctx, endpoints[0])
Expand Down Expand Up @@ -154,7 +323,7 @@ var _ = Describe("Defrag", func() {
oldDBSize := oldStatus.DbSize
oldRevision := oldStatus.Header.GetRevision()

defragThreadCtx, cancelDefragThread := context.WithTimeout(testCtx, time.Second*time.Duration(135))
defragThreadCtx, cancelDefragThread := context.WithTimeout(testCtx, time.Second*time.Duration(235))
defer cancelDefragThread()
DefragDataPeriodically(defragThreadCtx, etcdConnectionConfig, defragSchedule, func(_ context.Context, _ bool) (*brtypes.Snapshot, error) {
defragCount++
Expand Down
10 changes: 6 additions & 4 deletions pkg/defragmentor/defragmentor_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import (
)

const (
outputDir = "../../test/output"
etcdDir = outputDir + "/default.etcd"
etcdDialTimeout = time.Second * 30
outputDir = "../../test/output"
etcdDir = outputDir + "/default.etcd"
etcdDialTimeout = time.Second * 30
embeddedEtcdPortNo = "9089"
mockTimeout = time.Second * 5
)

var (
Expand All @@ -52,7 +54,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
err = os.RemoveAll(outputDir)
Expect(err).ShouldNot(HaveOccurred())

etcd, err = utils.StartEmbeddedEtcd(testCtx, etcdDir, logger)
etcd, err = utils.StartEmbeddedEtcd(testCtx, etcdDir, logger, embeddedEtcdPortNo)
Expect(err).ShouldNot(HaveOccurred())
endpoints = []string{etcd.Clients[0].Addr().String()}
logger.Infof("endpoints: %s", endpoints)
Expand Down
Loading