diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index ac5f0c7ab29..3a6f846a2f8 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1809,6 +1809,9 @@ func (s *EtcdServer) apply( var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3) + if err != nil { + s.lg.Error("failed to apply conf change", zap.Bool("shouldApplyV3", bool(shouldApplyV3)), zap.Error(err)) + } s.setAppliedIndex(e.Index) s.setTerm(e.Term) shouldStop = shouldStop || removedSelf diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 4b8a12f010e..d5ecd5038fc 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -298,6 +298,7 @@ func newServer(t *testing.T, recorder *nodeRecorder) *EtcdServer { r: *newRaftNode(raftNodeConfig{lg: lg, Node: recorder}), cluster: membership.NewCluster(lg), consistIndex: cindex.NewConsistentIndex(be), + kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } srv.cluster.SetBackend(schema.NewMembershipBackend(lg, be)) srv.cluster.SetStore(v2store.New()) @@ -475,6 +476,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { w: wait.New(), consistIndex: ci, beHooks: serverstorage.NewBackendHooks(lg, ci), + kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } // create EntryConfChange entry @@ -567,6 +569,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { w: wait.New(), consistIndex: ci, beHooks: serverstorage.NewBackendHooks(lg, ci), + kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } var ents []raftpb.Entry for i := 1; i <= 4; i++ { @@ -876,6 +879,7 @@ func TestAddMember(t *testing.T) { SyncTicker: &time.Ticker{}, consistIndex: cindex.NewFakeConsistentIndex(0), beHooks: serverstorage.NewBackendHooks(lg, nil), + kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } s.start() m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}} @@ -981,6 +985,7 @@ func TestRemoveMember(t *testing.T) { SyncTicker: &time.Ticker{}, consistIndex: cindex.NewFakeConsistentIndex(0), beHooks: serverstorage.NewBackendHooks(lg, nil), + kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } s.start() _, err := s.RemoveMember(context.Background(), 1234) @@ -1030,6 +1035,7 @@ func TestUpdateMember(t *testing.T) { SyncTicker: &time.Ticker{}, consistIndex: cindex.NewFakeConsistentIndex(0), beHooks: serverstorage.NewBackendHooks(lg, nil), + kv: mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } s.start() wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} diff --git a/server/storage/schema/membership.go b/server/storage/schema/membership.go index 44e2af1cd7f..431848ae915 100644 --- a/server/storage/schema/membership.go +++ b/server/storage/schema/membership.go @@ -72,9 +72,28 @@ func (s *membershipBackend) MustDeleteMemberFromBackend(id types.ID) { tx := s.be.BatchTx() tx.LockInsideApply() - defer tx.Unlock() tx.UnsafeDelete(Members, mkey) tx.UnsafePut(MembersRemoved, mkey, []byte("removed")) + tx.Unlock() + + // We need to forcibly commit the transaction, otherwise etcd might + // run into a situation that it haven't finished committing the data + // into backend storage (note: etcd periodically commits the bbolt + // transactions instead of on each request) when it receives next + // confChange request. Accordingly, etcd may still reads the stale + // data from bbolt when processing next confChange request. So it + // breaks linearizability. + // + // Note we don't need to forcibly commit the transaction for other + // kinds of request (e.g. normal key/value operations, member add + // or update requests), because there is a buffer on top of the bbolt. + // Each time when etcd reads data from backend storage, it will read + // data from both bbolt and the buffer. But there is no such a buffer + // for member delete requests. + // + // Please also refer to + // https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158 + s.be.ForceCommit() } func (s *membershipBackend) MustReadMembersFromBackend() (map[types.ID]*membership.Member, map[types.ID]bool) { diff --git a/tests/e2e/ctl_v3_member_no_proxy_test.go b/tests/e2e/ctl_v3_member_no_proxy_test.go index 619ae5d136c..e6d263b0c22 100644 --- a/tests/e2e/ctl_v3_member_no_proxy_test.go +++ b/tests/e2e/ctl_v3_member_no_proxy_test.go @@ -40,18 +40,17 @@ func TestMemberReplace(t *testing.T) { require.NoError(t, err) defer epc.Close() - memberId := rand.Int() % len(epc.Procs) - member := epc.Procs[memberId] + memberIdx := rand.Int() % len(epc.Procs) + member := epc.Procs[memberIdx] memberName := member.Config().Name var endpoints []string for i := 1; i < len(epc.Procs); i++ { - endpoints = append(endpoints, epc.Procs[(memberId+i)%len(epc.Procs)].EndpointsGRPC()...) + endpoints = append(endpoints, epc.Procs[(memberIdx+i)%len(epc.Procs)].EndpointsGRPC()...) } cc, err := e2e.NewEtcdctl(epc.Cfg.Client, endpoints) require.NoError(t, err) - c := epc.Etcdctl() - memberID, found, err := getMemberIdByName(ctx, c, memberName) + memberID, found, err := getMemberIdByName(ctx, cc, memberName) require.NoError(t, err) require.Equal(t, found, true, "Member not found") @@ -59,9 +58,9 @@ func TestMemberReplace(t *testing.T) { time.Sleep(etcdserver.HealthInterval) t.Logf("Removing member %s", memberName) - _, err = c.MemberRemove(ctx, memberID) + _, err = cc.MemberRemove(ctx, memberID) require.NoError(t, err) - _, found, err = getMemberIdByName(ctx, c, memberName) + _, found, err = getMemberIdByName(ctx, cc, memberName) require.NoError(t, err) require.Equal(t, found, false, "Expected member to be removed") for member.IsRunning() { @@ -82,12 +81,14 @@ func TestMemberReplace(t *testing.T) { err = patchArgs(member.Config().Args, "initial-cluster-state", "existing") require.NoError(t, err) + // Sleep 100ms to bypass the known issue https://github.com/etcd-io/etcd/issues/16687. + time.Sleep(100 * time.Millisecond) t.Logf("Starting member %s", memberName) err = member.Start(ctx) require.NoError(t, err) testutils.ExecuteUntil(ctx, t, func() { for { - _, found, err := getMemberIdByName(ctx, c, memberName) + _, found, err := getMemberIdByName(ctx, cc, memberName) if err != nil || !found { time.Sleep(10 * time.Millisecond) continue