Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the potential data loss for clusters with only one member #14394

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 59 additions & 10 deletions server/etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ func init() {
type toApply struct {
entries []raftpb.Entry
snapshot raftpb.Snapshot
// notifyc synchronizes etcd server applies with the raft node
notifyc chan struct{}
// snapNotifyc synchronizes etcd server applies with the raft node
snapNotifyc chan struct{}
// walNotifyc synchronizes etcd server applies with the WAL persistence
walNotifyc chan struct{}
}

type raftNode struct {
Expand Down Expand Up @@ -200,11 +202,16 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}
}

notifyc := make(chan struct{}, 1)
snapNotifyc := make(chan struct{}, 1)
var walNotifyc chan struct{}
if shouldWaitWALSync(rd) {
walNotifyc = make(chan struct{}, 1)
}
ap := toApply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
snapNotifyc: snapNotifyc,
walNotifyc: walNotifyc,
}

updateCommittedIndex(&ap, rh)
Expand Down Expand Up @@ -237,6 +244,12 @@ func (r *raftNode) start(rh *raftReadyHandler) {
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
}

if walNotifyc != nil {
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
// etcdserver should wait for this notification before responding to client.
walNotifyc <- struct{}{}
}

if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
Expand All @@ -252,7 +265,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}

// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}
snapNotifyc <- struct{}{}

// gofail: var raftBeforeApplySnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
Expand All @@ -272,7 +285,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
msgs := r.processMessages(rd.Messages)

// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
notifyc <- struct{}{}
snapNotifyc <- struct{}{}

// Candidate or follower needs to wait for all pending configuration
// changes to be applied before sending messages.
Expand All @@ -293,7 +306,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// to be in sync with scheduled config-change job
// (assume notifyc has cap of 1)
select {
case notifyc <- struct{}{}:
case snapNotifyc <- struct{}{}:
case <-r.stopped:
return
}
Expand All @@ -303,7 +316,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.transport.Send(msgs)
} else {
// leader already processed 'MsgSnap' and signaled
notifyc <- struct{}{}
snapNotifyc <- struct{}{}
}

r.Advance()
Expand All @@ -314,6 +327,42 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}()
}

// For a cluster with only one member, the raft may send both the
// unstable entries and committed entries to etcdserver, and there
// may have overlapped log entries between them.
//
// etcd responds to the client once it finishes (actually partially)
// the applying workflow. But when the client receives the response,
// it doesn't mean etcd has already successfully saved the data,
// including BoltDB and WAL, because:
// 1. etcd commits the boltDB transaction periodically instead of on each request;
// 2. etcd saves WAL entries in parallel with applying the committed entries.
// Accordingly, it might run into a situation of data loss when the etcd crashes
// immediately after responding to the client and before the boltDB and WAL
// successfully save the data to disk.
// Note that this issue can only happen for clusters with only one member.
//
// For clusters with multiple members, it isn't an issue, because etcd will
// not commit & apply the data before it being replicated to majority members.
// When the client receives the response, it means the data must have been applied.
// It further means the data must have been committed.
// Note: for clusters with multiple members, the raft will never send identical
// unstable entries and committed entries to etcdserver.
//
// Refer to https://github.com/etcd-io/etcd/issues/14370.
func shouldWaitWALSync(rd raft.Ready) bool {
if len(rd.CommittedEntries) == 0 || len(rd.Entries) == 0 {
return false
}

// Check if there is overlap between unstable and committed entries
// assuming that their index and term are only incrementing.
lastCommittedEntry := rd.CommittedEntries[len(rd.CommittedEntries)-1]
firstUnstableEntry := rd.Entries[0]
return lastCommittedEntry.Term > firstUnstableEntry.Term ||
(lastCommittedEntry.Term == firstUnstableEntry.Term && lastCommittedEntry.Index >= firstUnstableEntry.Index)
}

func updateCommittedIndex(ap *toApply, rh *raftReadyHandler) {
var ci uint64
if len(ap.entries) != 0 {
Expand Down
131 changes: 125 additions & 6 deletions server/etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3"
Expand Down Expand Up @@ -190,6 +191,52 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {

// TestConfigChangeBlocksApply ensures toApply blocks if committed entries contain config-change.
func TestConfigChangeBlocksApply(t *testing.T) {
testEtcdserverAndRaftInteraction(t, testConfigChangeBlockApply)
}

func TestWALSyncNotBlockApply(t *testing.T) {
testEtcdserverAndRaftInteraction(t, testWALSyncNotBlockApply)
}

func TestWALSyncBlockApply(t *testing.T) {
testEtcdserverAndRaftInteraction(t, testWALSyncBlockApply)
}

func testConfigChangeBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply {
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateFollower},
CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
}
return <-srv.r.applyc
}

func testWALSyncNotBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply {
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateFollower},
CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
}
ap := <-srv.r.applyc
if ap.walNotifyc != nil {
t.Error("unexpected ap.walNotifyc, expected nil")
}
return ap
}

func testWALSyncBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply {
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateFollower},
Entries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
}
ap := <-srv.r.applyc
if ap.walNotifyc == nil {
t.Error("unexpected ap.walNotifyc, expected not nil")
}
assert.NotEqual(t, nil, ap.walNotifyc)
return ap
}

func testEtcdserverAndRaftInteraction(t *testing.T, testFunc func(*testing.T, *EtcdServer, *readyNode) toApply) {
n := newNopReadyNode()

r := newRaftNode(raftNodeConfig{
Expand All @@ -208,11 +255,7 @@ func TestConfigChangeBlocksApply(t *testing.T) {
})
defer srv.r.Stop()

n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateFollower},
CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
}
ap := <-srv.r.applyc
ap := testFunc(t, srv, n)

continueC := make(chan struct{})
go func() {
Expand All @@ -228,7 +271,7 @@ func TestConfigChangeBlocksApply(t *testing.T) {
}

// finish toApply, unblock raft routine
<-ap.notifyc
<-ap.snapNotifyc

select {
case <-continueC:
Expand Down Expand Up @@ -317,3 +360,79 @@ func TestStopRaftNodeMoreThanOnce(t *testing.T) {
}
}
}

func TestShouldWaitWALSync(t *testing.T) {
testcases := []struct {
name string
unstableEntries []raftpb.Entry
commitedEntries []raftpb.Entry
expectedResult bool
}{
{
name: "both entries are nil",
unstableEntries: nil,
commitedEntries: nil,
expectedResult: false,
},
{
name: "both entries are empty slices",
unstableEntries: []raftpb.Entry{},
commitedEntries: []raftpb.Entry{},
expectedResult: false,
},
{
name: "one nil and the other empty",
unstableEntries: nil,
commitedEntries: []raftpb.Entry{},
expectedResult: false,
},
{
name: "one nil and the other has data",
unstableEntries: nil,
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: false,
},
{
name: "one empty and the other has data",
unstableEntries: []raftpb.Entry{},
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: false,
},
{
name: "has different term and index",
unstableEntries: []raftpb.Entry{{Term: 5, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: false,
},
{
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
name: "has identical data",
unstableEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
expectedResult: true,
},
{
name: "has overlapped entry",
unstableEntries: []raftpb.Entry{
{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}},
{Term: 4, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x44, 0x55, 0x66}},
{Term: 4, Index: 12, Type: raftpb.EntryNormal, Data: []byte{0x77, 0x88, 0x99}},
},
commitedEntries: []raftpb.Entry{
{Term: 4, Index: 8, Type: raftpb.EntryNormal, Data: []byte{0x07, 0x08, 0x09}},
{Term: 4, Index: 9, Type: raftpb.EntryNormal, Data: []byte{0x10, 0x11, 0x12}},
{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}},
},
expectedResult: true,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
shouldWALSync := shouldWaitWALSync(raft.Ready{
Entries: tc.unstableEntries,
CommittedEntries: tc.commitedEntries,
})
assert.Equal(t, tc.expectedResult, shouldWALSync)
})
}
}
Loading