Skip to content

Commit

Permalink
check pending reads when quorum becomes less (#363)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu committed May 25, 2020
1 parent 236fb28 commit b5f5830
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 54 deletions.
35 changes: 35 additions & 0 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4724,3 +4724,38 @@ fn test_request_snapshot_on_role_change() {
nt.peers[&2].pending_request_snapshot
);
}

// `test_read_when_quorum_becomes_less` tests read requests could be handled earlier
// if quorum becomes less in configuration changes.
#[test]
fn test_read_when_quorum_becomes_less() {
let l = default_logger();
let mut network = Network::new(vec![None, None], &l);

let mut m = Message::default();
m.from = 1;
m.to = 1;
m.set_msg_type(MessageType::MsgHup);
network.send(vec![m]);
assert_eq!(network.peers[&1].raft_log.committed, 1);

// Read index on the peer.
let mut m = Message::default();
m.to = 1;
m.set_msg_type(MessageType::MsgReadIndex);
let mut e = Entry::default();
e.data = b"abcdefg".to_vec();
m.set_entries(vec![e].into());
network.dispatch(vec![m]).unwrap();

// Broadcast heartbeats.
let heartbeats = network.read_messages();
network.dispatch(heartbeats).unwrap();

// Drop hearbeat response from peer 2.
let heartbeat_responses = network.read_messages();
assert_eq!(heartbeat_responses.len(), 1);

network.peers.get_mut(&1).unwrap().remove_node(2).unwrap();
assert!(!network.peers[&1].read_states.is_empty());
}
93 changes: 45 additions & 48 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1324,28 +1324,14 @@ impl<T: Storage> Raft<T> {
}
}

match self.read_only.recv_ack(m) {
match self.read_only.recv_ack(m.from, &m.context) {
Some(acks) if prs.has_quorum(acks) => {}
_ => return,
}

let rss = self.read_only.advance(m, &self.logger);
for rs in rss {
let mut req = rs.req;
if req.from == INVALID_ID || req.from == self.id {
// from local member
let rs = ReadState {
index: rs.index,
request_ctx: req.take_entries()[0].take_data(),
};
self.read_states.push(rs);
} else {
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.to = req.from;
to_send.index = rs.index;
to_send.set_entries(req.take_entries());
ctx.more_to_send.push(to_send);
for rs in self.read_only.advance(&m.context, &self.logger) {
if let Some(m) = self.handle_ready_read_index(rs.req, rs.index) {
ctx.more_to_send.push(m);
}
}
}
Expand Down Expand Up @@ -1566,40 +1552,15 @@ impl<T: Storage> Raft<T> {
}
ReadOnlyOption::LeaseBased => {
let read_index = self.raft_log.committed;
if m.from == INVALID_ID || m.from == self.id {
// from local member
let rs = ReadState {
index: read_index,
request_ctx: m.take_entries()[0].take_data(),
};
self.read_states.push(rs);
} else {
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.to = m.from;
to_send.index = read_index;
to_send.set_entries(m.take_entries());
self.send(to_send);
if let Some(m) = self.handle_ready_read_index(m, read_index) {
self.send(m);
}
}
}
} else {
// there is only one voting member (the leader) in the cluster
if m.from == INVALID_ID || m.from == self.id {
// from leader itself
let rs = ReadState {
index: self.raft_log.committed,
request_ctx: m.take_entries()[0].take_data(),
};
self.read_states.push(rs);
} else {
// from learner member
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.to = m.from;
to_send.index = self.raft_log.committed;
to_send.set_entries(m.take_entries());
self.send(to_send);
let read_index = self.raft_log.committed;
if let Some(m) = self.handle_ready_read_index(m, read_index) {
self.send(m);
}
}
return Ok(());
Expand Down Expand Up @@ -2127,6 +2088,25 @@ impl<T: Storage> Raft<T> {
if self.maybe_commit() {
self.bcast_append();
}

// The quorum size is now smaller, consider to response some read requests.
// If there is only one peer, all pending read requests must be responsed.
if let Some(ctx) = self.read_only.last_pending_request_ctx() {
let prs = self.take_prs();
if self
.read_only
.recv_ack(self.id, &ctx)
.map_or(false, |acks| prs.has_quorum(acks))
{
for rs in self.read_only.advance(&ctx, &self.logger) {
if let Some(m) = self.handle_ready_read_index(rs.req, rs.index) {
self.send(m);
}
}
}
self.set_prs(prs);
}

// If the removed node is the lead_transferee, then abort the leadership transferring.
if self.state == StateRole::Leader && self.lead_transferee == Some(id) {
self.abort_leader_transfer();
Expand Down Expand Up @@ -2238,4 +2218,21 @@ impl<T: Storage> Raft<T> {
m.request_snapshot = self.pending_request_snapshot;
self.send(m);
}

fn handle_ready_read_index(&mut self, mut req: Message, index: u64) -> Option<Message> {
if req.from == INVALID_ID || req.from == self.id {
let rs = ReadState {
index,
request_ctx: req.take_entries()[0].take_data(),
};
self.read_states.push(rs);
return None;
}
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.to = req.from;
to_send.index = index;
to_send.set_entries(req.take_entries());
Some(to_send)
}
}
12 changes: 6 additions & 6 deletions src/read_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,23 +101,23 @@ impl ReadOnly {
/// Notifies the ReadOnly struct that the raft state machine received
/// an acknowledgment of the heartbeat that attached with the read only request
/// context.
pub fn recv_ack(&mut self, m: &Message) -> Option<&HashSet<u64>> {
self.pending_read_index.get_mut(&m.context).map(|rs| {
rs.acks.insert(m.from);
pub fn recv_ack(&mut self, id: u64, ctx: &[u8]) -> Option<&HashSet<u64>> {
self.pending_read_index.get_mut(ctx).map(|rs| {
rs.acks.insert(id);
&rs.acks
})
}

/// Advances the read only request queue kept by the ReadOnly struct.
/// It dequeues the requests until it finds the read only request that has
/// the same context as the given `m`.
pub fn advance(&mut self, m: &Message, logger: &Logger) -> Vec<ReadIndexStatus> {
/// the same context as the given `ctx`.
pub fn advance(&mut self, ctx: &[u8], logger: &Logger) -> Vec<ReadIndexStatus> {
let mut rss = vec![];
if let Some(i) = self.read_index_queue.iter().position(|x| {
if !self.pending_read_index.contains_key(x) {
fatal!(logger, "cannot find correspond read state from pending map");
}
*x == m.context
*x == ctx
}) {
for _ in 0..=i {
let rs = self.read_index_queue.pop_front().unwrap();
Expand Down

0 comments on commit b5f5830

Please sign in to comment.