diff --git a/src/raft.rs b/src/raft.rs index 627cb6f01..50fdf08d9 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -1692,11 +1692,23 @@ impl Raft { } } } else { - let rs = ReadState { - index: self.raft_log.committed, - request_ctx: m.take_entries()[0].take_data(), - }; - self.read_states.push(rs); + // there is only one voting member (the leader) in the cluster + if m.get_from() == INVALID_ID || m.get_from() == self.id { + // from leader itself + let rs = ReadState { + index: self.raft_log.committed, + request_ctx: m.take_entries()[0].take_data(), + }; + self.read_states.push(rs); + } else { + // from learner member + let mut to_send = Message::default(); + to_send.set_to(m.get_from()); + to_send.set_msg_type(MessageType::MsgReadIndexResp); + to_send.set_index(self.raft_log.committed); + to_send.set_entries(m.take_entries()); + self.send(to_send); + } } return Ok(()); } diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index e30d99975..fdabaed43 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -2189,6 +2189,79 @@ fn test_read_only_option_safe() { } } +#[test] +fn test_read_only_with_learner() { + setup_for_test(); + let a = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage()); + let b = new_test_learner_raft(2, vec![1], vec![2], 10, 1, new_storage()); + + let mut nt = Network::new(vec![Some(a), Some(b)]); + + // we can not let system choose the value of randomizedElectionTimeout + // otherwise it will introduce some uncertainty into this test case + // we need to ensure randomizedElectionTimeout > electionTimeout here + let b_election_timeout = nt.peers[&2].get_election_timeout(); + nt.peers + .get_mut(&2) + .unwrap() + .set_randomized_election_timeout(b_election_timeout + 1); + + for _ in 0..b_election_timeout { + nt.peers.get_mut(&2).unwrap().tick(); + } + nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + + assert_eq!(nt.peers[&1].state, StateRole::Leader); + assert_eq!(nt.peers[&2].state, StateRole::Follower); + + let mut tests = vec![ + (1, 10, 12, "ctx1"), + (2, 10, 22, "ctx2"), + (1, 10, 32, "ctx3"), + (2, 10, 42, "ctx4"), + ]; + + for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() { + for _ in 0..proposals { + nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); + } + + let e = new_entry(0, 0, Some(wctx)); + nt.send(vec![new_message_with_entries( + id, + id, + MessageType::MsgReadIndex, + vec![e], + )]); + + let read_states: Vec = nt + .peers + .get_mut(&id) + .unwrap() + .read_states + .drain(..) + .collect(); + assert_eq!( + read_states.is_empty(), + false, + "#{}: read_states is empty, want non-empty", + i + ); + let rs = &read_states[0]; + assert_eq!( + rs.index, wri, + "#{}: read_index = {}, want {}", + i, rs.index, wri + ); + let vec_wctx = wctx.as_bytes().to_vec(); + assert_eq!( + rs.request_ctx, vec_wctx, + "#{}: request_ctx = {:?}, want {:?}", + i, rs.request_ctx, vec_wctx + ); + } +} + #[test] fn test_read_only_option_lease() { setup_for_test();