Skip to content

Commit

Permalink
commit transaction for each configuration change
Browse files Browse the repository at this point in the history
Current when etcd remove a member, it just removes it from bbolt db,
but doesn't update the cache. On the other hands, etcd periodically
commit each bbolt transaction. When etcd processes the next conf
change request, it might not have commit the previous member remove
request into bbolt. Accordingly it breaks the linerizability.

Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
  • Loading branch information
ahrtr committed Dec 14, 2023
1 parent a70fa9b commit a1e06dd
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
5 changes: 5 additions & 0 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,11 @@ func (s *EtcdServer) apply(
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
if err != nil {
s.lg.Error("failed to apply conf change", zap.Bool("shouldApplyV3", bool(shouldApplyV3)), zap.Error(err))
} else {
s.KV().Commit()
}
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
shouldStop = shouldStop || removedSelf
Expand Down
17 changes: 9 additions & 8 deletions tests/e2e/ctl_v3_member_no_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,27 @@ func TestMemberReplace(t *testing.T) {
require.NoError(t, err)
defer epc.Close()

memberId := rand.Int() % len(epc.Procs)
member := epc.Procs[memberId]
memberIdx := rand.Int() % len(epc.Procs)
member := epc.Procs[memberIdx]
memberName := member.Config().Name
var endpoints []string
for i := 1; i < len(epc.Procs); i++ {
endpoints = append(endpoints, epc.Procs[(memberId+i)%len(epc.Procs)].EndpointsGRPC()...)
endpoints = append(endpoints, epc.Procs[(memberIdx+i)%len(epc.Procs)].EndpointsGRPC()...)
}
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, endpoints)
require.NoError(t, err)

c := epc.Etcdctl()
memberID, found, err := getMemberIdByName(ctx, c, memberName)
memberID, found, err := getMemberIdByName(ctx, cc, memberName)
require.NoError(t, err)
require.Equal(t, found, true, "Member not found")

// Need to wait health interval for cluster to accept member changes
time.Sleep(etcdserver.HealthInterval)

t.Logf("Removing member %s", memberName)
_, err = c.MemberRemove(ctx, memberID)
_, err = cc.MemberRemove(ctx, memberID)
require.NoError(t, err)
_, found, err = getMemberIdByName(ctx, c, memberName)
_, found, err = getMemberIdByName(ctx, cc, memberName)
require.NoError(t, err)
require.Equal(t, found, false, "Expected member to be removed")
for member.IsRunning() {
Expand All @@ -82,12 +81,14 @@ func TestMemberReplace(t *testing.T) {
err = patchArgs(member.Config().Args, "initial-cluster-state", "existing")
require.NoError(t, err)

// Sleep 100ms to bypass the known issue https://github.com/etcd-io/etcd/issues/16687.
time.Sleep(100 * time.Millisecond)
t.Logf("Starting member %s", memberName)
err = member.Start(ctx)
require.NoError(t, err)
testutils.ExecuteUntil(ctx, t, func() {
for {
_, found, err := getMemberIdByName(ctx, c, memberName)
_, found, err := getMemberIdByName(ctx, cc, memberName)
if err != nil || !found {
time.Sleep(10 * time.Millisecond)
continue
Expand Down

0 comments on commit a1e06dd

Please sign in to comment.