From 2ebbed55f4c6e58fdb04f44b61c696117fd936c3 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 12 Apr 2019 18:01:06 +0800 Subject: [PATCH] fix a bug about initialized storage's term (#214) --- examples/five_mem_node/main.rs | 26 ++- harness/src/interface.rs | 26 +-- harness/src/network.rs | 56 +++--- src/config.rs | 1 + src/raft.rs | 6 +- src/storage.rs | 14 +- tests/integration_cases/test_raft.rs | 215 +++++++++++---------- tests/integration_cases/test_raft_paper.rs | 40 ++-- tests/integration_cases/test_raft_snap.rs | 19 +- tests/integration_cases/test_raw_node.rs | 9 +- 10 files changed, 229 insertions(+), 183 deletions(-) diff --git a/examples/five_mem_node/main.rs b/examples/five_mem_node/main.rs index e597eb842..78f28372d 100644 --- a/examples/five_mem_node/main.rs +++ b/examples/five_mem_node/main.rs @@ -103,6 +103,7 @@ fn main() { add_all_followers(proposals.as_ref()); // Put 100 key-value pairs. + println!("We get a 5 nodes Raft cluster now, now propose 100 proposals"); (0..100u16) .filter(|i| { let (proposal, rx) = Proposal::normal(*i, "hello, world".to_owned()); @@ -113,6 +114,9 @@ fn main() { }) .count(); + println!("Propose 100 proposals success!"); + + // FIXME: the program will be blocked here forever. Need to exit gracefully. for th in handles { th.join().unwrap(); } @@ -197,16 +201,27 @@ fn on_ready( if !raft_group.has_ready() { return; } + let store = raft_group.raft.raft_log.store.clone(); + // Get the `Ready` with `RawNode::ready` interface. let mut ready = raft_group.ready(); // Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize // raft logs to the latest position. - if let Err(e) = raft_group.raft.raft_log.store.wl().append(ready.entries()) { + if let Err(e) = store.wl().append(ready.entries()) { error!("persist raft log fail: {:?}, need to retry or panic", e); return; } + // Apply the snapshot. It's necessary because in `RawNode::advance` we stabilize the snapshot. + if *ready.snapshot() != Snapshot::new_() { + let s = ready.snapshot().clone(); + if let Err(e) = store.wl().apply_snapshot(s) { + error!("apply snapshot fail: {:?}, need to retry or panic", e); + return; + } + } + // Send out the messages come from the node. for msg in ready.messages.drain(..) { let to = msg.get_to(); @@ -217,7 +232,7 @@ fn on_ready( // Apply all committed proposals. if let Some(committed_entries) = ready.committed_entries.take() { - for entry in committed_entries { + for entry in &committed_entries { if entry.get_data().is_empty() { // From new elected leaders. continue; @@ -234,6 +249,8 @@ fn on_ready( ConfChangeType::BeginMembershipChange | ConfChangeType::FinalizeMembershipChange => unimplemented!(), } + let cs = ConfState::from(raft_group.raft.prs().configuration().clone()); + store.wl().set_conf_state(cs, None); } else { // For normal proposals, extract the key-value pair and then // insert them into the kv engine. @@ -250,6 +267,11 @@ fn on_ready( proposal.propose_success.send(true).unwrap(); } } + if let Some(last_committed) = committed_entries.last() { + let mut s = store.wl(); + s.mut_hard_state().set_commit(last_committed.get_index()); + s.mut_hard_state().set_term(last_committed.get_term()); + } } // Call `RawNode::advance` interface to update position flags in the raft. raft_group.advance(ready); diff --git a/harness/src/interface.rs b/harness/src/interface.rs index 2d03b9ea0..f99d8586b 100644 --- a/harness/src/interface.rs +++ b/harness/src/interface.rs @@ -25,7 +25,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use raft::{eraftpb::Message, storage::MemStorage, Progress, ProgressSet, Raft, Result}; +use raft::{eraftpb::Message, storage::MemStorage, Raft, Result}; use std::ops::{Deref, DerefMut}; /// A simulated Raft façade for testing. @@ -63,30 +63,6 @@ impl Interface { None => vec![], } } - - /// Initialize a raft with the given ID and peer set. - pub fn initial(&mut self, id: u64, ids: &[u64]) { - if self.raft.is_some() { - self.id = id; - let prs = self.take_prs(); - self.set_prs(ProgressSet::with_capacity( - ids.len(), - prs.learner_ids().len(), - )); - for id in ids { - let progress = Progress::new(0, 256); - if prs.learner_ids().contains(id) { - if let Err(e) = self.mut_prs().insert_learner(*id, progress) { - panic!("{}", e); - } - } else if let Err(e) = self.mut_prs().insert_voter(*id, progress) { - panic!("{}", e); - } - } - let term = self.term; - self.reset(term); - } - } } impl From>> for Interface { diff --git a/harness/src/network.rs b/harness/src/network.rs index 8a87ee3dd..434ed247e 100644 --- a/harness/src/network.rs +++ b/harness/src/network.rs @@ -61,45 +61,51 @@ pub struct Network { } impl Network { - /// Initializes a network from peers. + /// Get a base config. Calling `Network::new` will initialize peers with this config. + pub fn default_config() -> Config { + Config { + election_tick: 10, + heartbeat_tick: 1, + max_size_per_msg: NO_LIMIT, + max_inflight_msgs: 256, + ..Default::default() + } + } + + /// Initializes a network from `peers`. /// /// Nodes will recieve their ID based on their index in the vector, starting with 1. /// - /// A `None` node will be replaced with a new Raft node. + /// A `None` node will be replaced with a new Raft node, and its configuration will + /// be `peers`. pub fn new(peers: Vec>) -> Network { - Network::new_with_config(peers, false) + let config = Network::default_config(); + Network::new_with_config(peers, &config) } - /// Explicitly set the pre_vote option on newly created rafts. - /// - /// **TODO:** Make this accept any config. - pub fn new_with_config(mut peers: Vec>, pre_vote: bool) -> Network { - let size = peers.len(); - let peer_addrs: Vec = (1..=size as u64).collect(); + /// Initialize a network from `peers` with explicitly specified `config`. + pub fn new_with_config(mut peers: Vec>, config: &Config) -> Network { let mut nstorage = HashMap::new(); let mut npeers = HashMap::new(); - for (p, id) in peers.drain(..).zip(peer_addrs.clone()) { + + let peer_addrs: Vec = (1..=peers.len() as u64).collect(); + for (p, id) in peers.drain(..).zip(&peer_addrs) { match p { None => { let conf_state = ConfState::from((peer_addrs.clone(), vec![])); let store = MemStorage::new_with_conf_state(conf_state); - nstorage.insert(id, store.clone()); - let config = Config { - id, - election_tick: 10, - heartbeat_tick: 1, - max_size_per_msg: NO_LIMIT, - max_inflight_msgs: 256, - pre_vote, - tag: format!("{}", id), - ..Default::default() - }; + nstorage.insert(*id, store.clone()); + let mut config = config.clone(); + config.id = *id; + config.tag = format!("{}", id); let r = Raft::new(&config, store).unwrap().into(); - npeers.insert(id, r); + npeers.insert(*id, r); } - Some(mut p) => { - p.initial(id, &peer_addrs); - npeers.insert(id, p); + Some(r) => { + if r.raft.as_ref().map_or(false, |r| r.id != *id) { + panic!("peer {} in peers has a wrong position", r.id); + } + npeers.insert(*id, r); } } } diff --git a/src/config.rs b/src/config.rs index 2065aec0c..835fb27fc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -32,6 +32,7 @@ use super::{ }; /// Config contains the parameters to start a raft. +#[derive(Clone)] pub struct Config { /// The identity of the local raft. It cannot be 0, and must be unique in the group. pub id: u64, diff --git a/src/raft.rs b/src/raft.rs index 5447c602c..d9986d43b 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -999,7 +999,6 @@ impl Raft { /// message from a peer. pub fn step(&mut self, m: Message) -> Result<()> { // Handle the message term, which may result in our stepping down to a follower. - if m.get_term() == 0 { // local message } else if m.get_term() > self.term { @@ -1165,6 +1164,8 @@ impl Raft { } } MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => { + debug_assert!(m.get_log_term() != 0, "{:?} log term can't be 0", m); + // We can vote if this is a repeat of a vote we've already cast... let can_vote = (self.vote == m.get_from()) || // ...we haven't voted and we don't think there's a leader yet in this term... @@ -1924,6 +1925,8 @@ impl Raft { self.send(to_send); return; } + debug_assert!(m.get_log_term() != 0, "{:?} log term can't be 0", m); + let mut to_send = Message::new_(); to_send.set_to(m.get_from()); to_send.set_msg_type(MessageType::MsgAppendResponse); @@ -1968,6 +1971,7 @@ impl Raft { } fn handle_snapshot(&mut self, mut m: Message) { + debug_assert!(m.get_term() != 0, "{:?} term can't be 0", m); let (sindex, sterm) = ( m.get_snapshot().get_metadata().get_index(), m.get_snapshot().get_metadata().get_term(), diff --git a/src/storage.rs b/src/storage.rs index 0718d177a..88446f6bf 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -88,6 +88,9 @@ impl RaftState { pub trait Storage { /// `initial_state` is called when Raft is initialized. This interface will return a `RaftState` /// which contains `HardState` and `ConfState`. + /// + /// `RaftState` could be initialized or not. If it's initialized it means the `Storage` is + /// created with a configuration, and its last index and term should be greater than 0. fn initial_state(&self) -> Result; /// Returns a slice of log entries in the range `[low, high)`. @@ -156,6 +159,11 @@ impl MemStorageCore { &self.raft_state.hard_state } + /// Get the mut hard state. + pub fn mut_hard_state(&mut self) -> &mut HardState { + &mut self.raft_state.hard_state + } + /// Commit to an index. /// /// # Panics @@ -383,13 +391,17 @@ impl MemStorage { // Set index to 1 to make `first_index` greater than 1 so that there will be a gap between // uninitialized followers and the leader. And then followers can catch up the initial // configuration by snapshots. + // + // And, set term to 1 because in term 0 there is no leader exactly. + // // An another alternative is appending some conf-change entries here to construct the // initial configuration so that followers can catch up it by raft logs. However the entry // count depends on how many peers in the initial configuration, which makes some indices // not predictable. So we choose snapshot instead of raft logs here. - // core.snapshot_metadata.set_index(1); + core.snapshot_metadata.set_term(1); core.raft_state.hard_state.set_commit(1); + core.raft_state.hard_state.set_term(1); core.raft_state.conf_state = ConfState::from(conf_state); } diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index ad756f389..e30d99975 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -57,15 +57,16 @@ fn read_messages(raft: &mut Raft) -> Vec { raft.msgs.drain(..).collect() } -fn ents_with_config(terms: &[u64], pre_vote: bool) -> Interface { - let store = MemStorage::new(); +fn ents_with_config(terms: &[u64], pre_vote: bool, id: u64, peers: Vec) -> Interface { + let store = MemStorage::new_with_conf_state((peers.clone(), vec![])); for (i, term) in terms.iter().enumerate() { let mut e = Entry::new_(); - e.set_index(i as u64 + 1); + // An additional `plus one` for initialized storage. + e.set_index(i as u64 + 1 + 1); e.set_term(*term); store.wl().append(&[e]).expect(""); } - let mut raft = new_test_raft_with_prevote(1, vec![], 5, 1, store, pre_vote); + let mut raft = new_test_raft_with_prevote(id, peers, 5, 1, store, pre_vote); raft.reset(terms[terms.len() - 1]); raft } @@ -99,13 +100,11 @@ fn assert_raft_log( // voted_with_config creates a raft state machine with vote and term set // to the given value but no log entries (indicating that it voted in // the given term but has not receive any logs). -fn voted_with_config(vote: u64, term: u64, pre_vote: bool) -> Interface { - let mut hard_state = HardState::new_(); - hard_state.set_vote(vote); - hard_state.set_term(term); - let store = MemStorage::new(); - store.wl().set_hardstate(hard_state); - let mut raft = new_test_raft_with_prevote(1, vec![], 5, 1, store, pre_vote); +fn voted_with_config(vote: u64, term: u64, pre_vote: bool, id: u64, peers: Vec) -> Interface { + let store = MemStorage::new_with_conf_state((peers.clone(), vec![])); + store.wl().mut_hard_state().set_vote(vote); + store.wl().mut_hard_state().set_term(term); + let mut raft = new_test_raft_with_prevote(id, peers, 5, 1, store, pre_vote); raft.reset(term); raft } @@ -131,20 +130,6 @@ fn do_send_append(raft: &mut Raft, to: u64) { raft.set_prs(prs); } -fn new_raft_log(ents: &[Entry], offset: u64, committed: u64) -> RaftLog { - let store = MemStorage::new(); - store.wl().append(ents).expect(""); - RaftLog { - store, - unstable: Unstable { - offset, - ..Default::default() - }, - committed, - ..Default::default() - } -} - #[test] fn test_progress_become_probe() { setup_for_test(); @@ -380,31 +365,33 @@ fn test_leader_election_pre_vote() { } fn test_leader_election_with_config(pre_vote: bool) { + let mut config = Network::default_config(); + config.pre_vote = pre_vote; let mut tests = vec![ ( - Network::new_with_config(vec![None, None, None], pre_vote), + Network::new_with_config(vec![None, None, None], &config), StateRole::Leader, - 1, + 2, ), ( - Network::new_with_config(vec![None, None, NOP_STEPPER], pre_vote), + Network::new_with_config(vec![None, None, NOP_STEPPER], &config), StateRole::Leader, - 1, + 2, ), ( - Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER], pre_vote), + Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER], &config), StateRole::Candidate, - 1, + 2, ), ( - Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER, None], pre_vote), + Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER, None], &config), StateRole::Candidate, - 1, + 2, ), ( - Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER, None, None], pre_vote), + Network::new_with_config(vec![None, NOP_STEPPER, NOP_STEPPER, None, None], &config), StateRole::Leader, - 1, + 2, ), // three logs further along than 0, but in the same term so rejection // are returned instead of the votes being ignored. @@ -412,15 +399,15 @@ fn test_leader_election_with_config(pre_vote: bool) { Network::new_with_config( vec![ None, - Some(ents_with_config(&[1], pre_vote)), - Some(ents_with_config(&[1], pre_vote)), - Some(ents_with_config(&[1, 1], pre_vote)), + Some(ents_with_config(&[2], pre_vote, 2, vec![1, 2, 3, 4, 5])), + Some(ents_with_config(&[2], pre_vote, 3, vec![1, 2, 3, 4, 5])), + Some(ents_with_config(&[2, 2], pre_vote, 4, vec![1, 2, 3, 4, 5])), None, ], - pre_vote, + &config, ), StateRole::Follower, - 1, + 2, ), ]; @@ -435,7 +422,7 @@ fn test_leader_election_with_config(pre_vote: bool) { // In pre-vote mode, an election that fails to complete // leaves the node in pre-candidate state without advancing // the term. - (StateRole::PreCandidate, 0) + (StateRole::PreCandidate, 1) } else { (state, term) }; @@ -465,7 +452,9 @@ fn test_leader_cycle_pre_vote() { // pre-vote) work when not starting from a clean state (as they do in // test_leader_election) fn test_leader_cycle_with_config(pre_vote: bool) { - let mut network = Network::new_with_config(vec![None, None, None], pre_vote); + let mut config = Network::default_config(); + config.pre_vote = pre_vote; + let mut network = Network::new_with_config(vec![None, None, None], &config); for campaigner_id in 1..4 { network.send(vec![new_message( campaigner_id, @@ -522,15 +511,18 @@ fn test_leader_election_overwrite_newer_logs_with_config(pre_vote: bool) { // entry overwrites the loser's. (test_leader_sync_follower_log tests // the case where older log entries are overwritten, so this test // focuses on the case where the newer entries are lost). + let peers = vec![1, 2, 3, 4, 5]; + let mut config = Network::default_config(); + config.pre_vote = pre_vote; let mut network = Network::new_with_config( vec![ - Some(ents_with_config(&[1], pre_vote)), // Node 1: Won first election - Some(ents_with_config(&[1], pre_vote)), // Node 2: Get logs from node 1 - Some(ents_with_config(&[2], pre_vote)), // Node 3: Won second election - Some(voted_with_config(3, 2, pre_vote)), // Node 4: Voted but didn't get logs - Some(voted_with_config(3, 2, pre_vote)), // Node 5: Voted but didn't get logs + Some(ents_with_config(&[1], pre_vote, 1, peers.clone())), // Node 1: Won first election + Some(ents_with_config(&[1], pre_vote, 2, peers.clone())), // Node 2: Get logs from node 1 + Some(ents_with_config(&[2], pre_vote, 3, peers.clone())), // Node 3: Won second election + Some(voted_with_config(3, 2, pre_vote, 4, peers.clone())), // Node 4: Voted but didn't get logs + Some(voted_with_config(3, 2, pre_vote, 5, peers.clone())), // Node 5: Voted but didn't get logs ], - pre_vote, + &config, ); // Node 1 campaigns. The election fails because a quorum of nodes @@ -861,9 +853,9 @@ fn test_dueling_candidates() { ]; let tests = vec![ - (StateRole::Follower, 2), - (StateRole::Follower, 2), - (StateRole::Follower, 2), + (StateRole::Follower, 3), + (StateRole::Follower, 3), + (StateRole::Follower, 3), ]; for (i, &(state, term)) in tests.iter().enumerate() { @@ -890,7 +882,9 @@ fn test_dueling_pre_candidates() { let b = new_test_raft_with_prevote(2, vec![1, 2, 3], 10, 1, new_storage(), true); let c = new_test_raft_with_prevote(3, vec![1, 2, 3], 10, 1, new_storage(), true); - let mut nt = Network::new_with_config(vec![Some(a), Some(b), Some(c)], true); + let mut config = Network::default_config(); + config.pre_vote = true; + let mut nt = Network::new_with_config(vec![Some(a), Some(b), Some(c)], &config); nt.cut(1, 3); nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); @@ -912,9 +906,9 @@ fn test_dueling_pre_candidates() { let expects = vec![(2, 1, 2), (2, 1, 2), (1, 1, 1)]; let tests = vec![ - (1, StateRole::Leader, 1), - (2, StateRole::Follower, 1), - (3, StateRole::Follower, 1), + (1, StateRole::Leader, 2), + (2, StateRole::Follower, 2), + (3, StateRole::Follower, 2), ]; for (i, &(id, state, term)) in tests.iter().enumerate() { if nt.peers[&id].state != state { @@ -954,7 +948,7 @@ fn test_candidate_concede() { tt.send(vec![new_message(3, 3, MessageType::MsgBeat, 0)]); assert_eq!(tt.peers[&1].state, StateRole::Follower); - assert_eq!(tt.peers[&1].term, 1); + assert_eq!(tt.peers[&1].term, 2); for (_, p) in &tt.peers { assert_eq!(p.raft_log.committed, 3); // All raft logs are committed. @@ -975,7 +969,9 @@ fn test_single_node_candidate() { #[test] fn test_sinle_node_pre_candidate() { setup_for_test(); - let mut tt = Network::new_with_config(vec![None], true); + let mut config = Network::default_config(); + config.pre_vote = true; + let mut tt = Network::new_with_config(vec![None], &config); tt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); assert_eq!(tt.peers[&1].state, StateRole::Leader); @@ -1043,8 +1039,8 @@ fn test_proposal() { assert_raft_log(&prefix, &raft.raft_log, want_log); } } - if nw.peers[&1].term != 1 { - panic!("#{}: term = {}, want: {}", j, nw.peers[&1].term, 1); + if nw.peers[&1].term != 2 { + panic!("#{}: term = {}, want: {}", j, nw.peers[&1].term, 2); } } } @@ -1072,8 +1068,8 @@ fn test_proposal_by_proxy() { assert_raft_log(&prefix, &raft.raft_log, (3, 1, 3)); } } - if tt.peers[&1].term != 1 { - panic!("#{}: term = {}, want {}", j, tt.peers[&1].term, 1); + if tt.peers[&1].term != 2 { + panic!("#{}: term = {}, want {}", j, tt.peers[&1].term, 2); } } } @@ -1183,7 +1179,7 @@ fn test_handle_msg_append() { (nm(2, 3, 4, 3, None), 3, 1, true), // previous log non-exist // Ensure 2 (nm(2, 1, 2, 2, None), 3, 2, false), - (nm(2, 0, 1, 2, Some(vec![(2, 2)])), 2, 2, false), + (nm(2, 1, 1, 2, Some(vec![(2, 2)])), 2, 2, false), (nm(2, 2, 3, 4, Some(vec![(4, 2), (5, 2)])), 5, 4, false), (nm(2, 2, 3, 5, Some(vec![(4, 2)])), 4, 4, false), (nm(2, 1, 2, 5, Some(vec![(3, 2)])), 3, 3, false), @@ -1418,35 +1414,32 @@ fn test_recv_msg_request_vote() { fn test_recv_msg_request_vote_for_type(msg_type: MessageType) { let mut tests = vec![ - (StateRole::Follower, 0, 0, INVALID_ID, true), - (StateRole::Follower, 0, 1, INVALID_ID, true), - (StateRole::Follower, 0, 2, INVALID_ID, true), - (StateRole::Follower, 0, 3, INVALID_ID, false), - (StateRole::Follower, 1, 0, INVALID_ID, true), (StateRole::Follower, 1, 1, INVALID_ID, true), (StateRole::Follower, 1, 2, INVALID_ID, true), (StateRole::Follower, 1, 3, INVALID_ID, false), - (StateRole::Follower, 2, 0, INVALID_ID, true), (StateRole::Follower, 2, 1, INVALID_ID, true), - (StateRole::Follower, 2, 2, INVALID_ID, false), + (StateRole::Follower, 2, 2, INVALID_ID, true), (StateRole::Follower, 2, 3, INVALID_ID, false), - (StateRole::Follower, 3, 0, INVALID_ID, true), (StateRole::Follower, 3, 1, INVALID_ID, true), (StateRole::Follower, 3, 2, INVALID_ID, false), (StateRole::Follower, 3, 3, INVALID_ID, false), - (StateRole::Follower, 3, 2, 2, false), - (StateRole::Follower, 3, 2, 1, true), - (StateRole::Leader, 3, 3, 1, true), - (StateRole::PreCandidate, 3, 3, 1, true), - (StateRole::Candidate, 3, 3, 1, true), + (StateRole::Follower, 4, 1, INVALID_ID, true), + (StateRole::Follower, 4, 2, INVALID_ID, false), + (StateRole::Follower, 4, 3, INVALID_ID, false), + (StateRole::Follower, 4, 2, 2, false), + (StateRole::Follower, 4, 2, 1, true), + (StateRole::Leader, 4, 3, 1, true), + (StateRole::PreCandidate, 4, 3, 1, true), + (StateRole::Candidate, 4, 3, 1, true), ]; for (j, (state, index, log_term, vote_for, w_reject)) in tests.drain(..).enumerate() { - let raft_log = new_raft_log(&[empty_entry(2, 1), empty_entry(2, 2)], 3, 0); - let mut sm = new_test_raft(1, vec![1], 10, 1, new_storage()); + let store = MemStorage::new_with_conf_state((vec![1], vec![])); + let ents = &[empty_entry(2, 2), empty_entry(2, 3)]; + store.wl().append(ents).unwrap(); + let mut sm = new_test_raft(1, vec![1], 10, 1, store); sm.state = state; sm.vote = vote_for; - sm.raft_log = raft_log; let mut m = new_message(2, 0, msg_type, 0); m.set_index(index); @@ -1502,67 +1495,67 @@ fn test_state_transition() { StateRole::Follower, StateRole::PreCandidate, true, - 0, + 1, INVALID_ID, ), ( StateRole::Follower, StateRole::Candidate, true, - 1, + 2, INVALID_ID, ), - (StateRole::Follower, StateRole::Leader, false, 0, INVALID_ID), + (StateRole::Follower, StateRole::Leader, false, 1, INVALID_ID), ( StateRole::PreCandidate, StateRole::Follower, true, - 0, + 1, INVALID_ID, ), ( StateRole::PreCandidate, StateRole::PreCandidate, true, - 0, + 1, INVALID_ID, ), ( StateRole::PreCandidate, StateRole::Candidate, true, - 1, + 2, INVALID_ID, ), - (StateRole::PreCandidate, StateRole::Leader, true, 0, 1), + (StateRole::PreCandidate, StateRole::Leader, true, 1, 1), ( StateRole::Candidate, StateRole::Follower, true, - 0, + 1, INVALID_ID, ), ( StateRole::Candidate, StateRole::PreCandidate, true, - 0, + 1, INVALID_ID, ), ( StateRole::Candidate, StateRole::Candidate, true, - 1, + 2, INVALID_ID, ), - (StateRole::Candidate, StateRole::Leader, true, 0, 1), + (StateRole::Candidate, StateRole::Leader, true, 1, 1), (StateRole::Leader, StateRole::Follower, true, 1, INVALID_ID), ( StateRole::Leader, StateRole::PreCandidate, false, - 0, + 1, INVALID_ID, ), ( @@ -1572,7 +1565,7 @@ fn test_state_transition() { 1, INVALID_ID, ), - (StateRole::Leader, StateRole::Leader, true, 0, 1), + (StateRole::Leader, StateRole::Leader, true, 1, 1), ]; for (i, (from, to, wallow, wterm, wlead)) in tests.drain(..).enumerate() { let sm: &mut Raft = &mut new_test_raft(1, vec![1], 10, 1, new_storage()); @@ -2386,24 +2379,30 @@ fn test_read_only_for_new_leader() { #[test] fn test_leader_append_response() { setup_for_test(); - // initial progress: match = 0; next = 3 + // Initial progress: match = 0, next = 4 on followers. let mut tests = vec![ - (3, true, 0, 3, 0, 0, 0), // stale resp; no replies - (2, true, 0, 2, 1, 1, 0), // denied resp; leader does not commit; descrease next and send - // probing msg - (2, false, 2, 4, 2, 2, 2), // accept resp; leader commits; broadcast with commit index - (0, false, 0, 3, 0, 0, 0), + // Stale resp; no replies. + (4, true, 0, 4, 0, 0, 0), + // Denied resp; decrease next and send probing message. + (3, true, 0, 3, 1, 2, 1), + // Accepted resp; leader commits to 3; broadcast with committed index. + (3, false, 3, 5, 2, 3, 3), + (0, false, 0, 4, 0, 0, 0), ]; for (i, (index, reject, wmatch, wnext, wmsg_num, windex, wcommitted)) in tests.drain(..).enumerate() { - // sm term is 1 after it becomes the leader. - // thus the last log term must be 1 to be committed. - let mut sm = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage()); - sm.raft_log = new_raft_log(&[empty_entry(0, 1), empty_entry(1, 2)], 3, 0); + // Initial raft logs: last index = 3, commited = 1. + let store = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])); + let ents = &[empty_entry(1, 2), empty_entry(2, 3)]; + store.wl().append(ents).unwrap(); + let mut sm = new_test_raft(1, vec![1, 2, 3], 10, 1, store); + + // sm term is 2 after it becomes the leader. sm.become_candidate(); sm.become_leader(); + sm.read_messages(); let mut m = new_message(2, 0, MessageType::MsgAppendResponse, 0); m.set_index(index); @@ -2541,9 +2540,11 @@ fn test_recv_msg_beat() { ]; for (i, (state, w_msg)) in tests.drain(..).enumerate() { - let mut sm = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage()); - sm.raft_log = new_raft_log(&[empty_entry(0, 1), empty_entry(1, 2)], 0, 0); - sm.term = 1; + let store = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])); + let ents = &[empty_entry(1, 2), empty_entry(1, 3)]; + store.wl().append(ents).unwrap(); + + let mut sm = new_test_raft(1, vec![1, 2, 3], 10, 1, store); sm.state = state; sm.step(new_message(1, 1, MessageType::MsgBeat, 0)) .expect(""); @@ -2900,7 +2901,7 @@ fn test_step_ignore_config() { let index = r.raft_log.last_index(); let pending_conf_index = r.pending_conf_index; r.step(m.clone()).expect(""); - let mut we = empty_entry(1, 4); + let mut we = empty_entry(2, 4); we.set_entry_type(EntryType::EntryNormal); let wents = vec![we]; let entries = r.raft_log.entries(index + 1, None).expect(""); @@ -3490,7 +3491,9 @@ fn test_node_with_smaller_term_can_complete_election() { n3.become_follower(1, INVALID_ID); // cause a network partition to isolate node 3 - let mut nt = Network::new_with_config(vec![Some(n1), Some(n2), Some(n3)], true); + let mut config = Network::default_config(); + config.pre_vote = true; + let mut nt = Network::new_with_config(vec![Some(n1), Some(n2), Some(n3)], &config); nt.cut(1, 3); nt.cut(2, 3); diff --git a/tests/integration_cases/test_raft_paper.rs b/tests/integration_cases/test_raft_paper.rs index fd31c29da..9bf466096 100644 --- a/tests/integration_cases/test_raft_paper.rs +++ b/tests/integration_cases/test_raft_paper.rs @@ -97,10 +97,10 @@ fn test_update_term_from_message(state: StateRole) { } let mut m = new_message(0, 0, MessageType::MsgAppend, 0); - m.set_term(2); + m.set_term(3); r.step(m).expect(""); - assert_eq!(r.term, 2); + assert_eq!(r.term, 3); assert_eq!(r.state, StateRole::Follower); } @@ -138,7 +138,7 @@ fn test_leader_bcast_beat() { let new_message_ext = |f, to| { let mut m = new_message(f, to, MessageType::MsgHeartbeat, 0); - m.set_term(1); + m.set_term(2); m.set_commit(0); m }; @@ -174,7 +174,7 @@ fn test_nonleader_start_election(state: StateRole) { let et = 10; let mut r = new_test_raft(1, vec![1, 2, 3], et, 1, new_storage()); match state { - StateRole::Follower => r.become_follower(1, 2), + StateRole::Follower => r.become_follower(2, 2), StateRole::Candidate => r.become_candidate(), _ => panic!("Only non-leader role is accepted."), } @@ -183,15 +183,15 @@ fn test_nonleader_start_election(state: StateRole) { r.tick(); } - assert_eq!(r.term, 2); + assert_eq!(r.term, 3); assert_eq!(r.state, StateRole::Candidate); assert!(r.votes[&r.id]); let mut msgs = r.read_messages(); msgs.sort_by_key(|m| format!("{:?}", m)); let new_message_ext = |f, to| { let mut m = new_message(f, to, MessageType::MsgRequestVote, 0); - m.set_term(2); - m.set_log_term(0); + m.set_term(3); + m.set_log_term(1); m.set_index(1); m }; @@ -253,8 +253,8 @@ fn test_leader_election_in_one_round_rpc() { if r.state != state { panic!("#{}: state = {:?}, want {:?}", i, r.state, state); } - if r.term != 1 { - panic!("#{}: term = {}, want {}", i, r.term, 1); + if r.term != 2 { + panic!("#{}: term = {}, want {}", i, r.term, 2); } } } @@ -280,6 +280,7 @@ fn test_follower_vote() { let mut m = new_message(nvote, 1, MessageType::MsgRequestVote, 0); m.set_term(1); + m.set_log_term(1); m.set_index(1); r.step(m).expect(""); @@ -307,7 +308,7 @@ fn test_candidate_fallback() { m.set_term(term); m }; - let mut tests = vec![new_message_ext(2, 1, 1), new_message_ext(2, 1, 2)]; + let mut tests = vec![new_message_ext(2, 1, 2), new_message_ext(2, 1, 3)]; for (i, m) in tests.drain(..).enumerate() { let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage()); r.step(new_message(1, 1, MessageType::MsgHup, 0)).expect(""); @@ -449,12 +450,12 @@ fn test_leader_start_replication() { assert_eq!(r.raft_log.committed, li); let mut msgs = r.read_messages(); msgs.sort_by_key(|m| format!("{:?}", m)); - let wents = vec![new_entry(1, li + 1, SOME_DATA)]; + let wents = vec![new_entry(2, li + 1, SOME_DATA)]; let new_message_ext = |f, to, ents| { let mut m = new_message(f, to, MessageType::MsgAppend, 0); - m.set_term(1); + m.set_term(2); m.set_index(li); - m.set_log_term(1); + m.set_log_term(2); m.set_commit(li); m.set_entries(ents); m @@ -491,7 +492,7 @@ fn test_leader_commit_entry() { } assert_eq!(r.raft_log.committed, li + 1); - let wents = vec![new_entry(1, li + 1, SOME_DATA)]; + let wents = vec![new_entry(2, li + 1, SOME_DATA)]; assert_eq!(r.raft_log.next_entries(), Some(wents)); let mut msgs = r.read_messages(); msgs.sort_by_key(|m| format!("{:?}", m)); @@ -625,6 +626,7 @@ fn test_follower_commit_entry() { let mut m = new_message(2, 1, MessageType::MsgAppend, 0); m.set_term(1); + m.set_log_term(1); m.set_index(1); m.set_commit(commit); m.set_entries(ents.clone()); @@ -655,7 +657,7 @@ fn test_follower_check_msg_append() { let ents = vec![empty_entry(1, 2), empty_entry(2, 3)]; let mut tests = vec![ // match with committed entries - (0, 1, 1, false, 0), + (1, 2, 2, false, 0), (ents[0].get_term(), ents[0].get_index(), 2, false, 0), // match with uncommitted entries (ents[1].get_term(), ents[1].get_index(), 3, false, 0), @@ -683,7 +685,7 @@ fn test_follower_check_msg_append() { let cfg = new_test_config(1, 10, 1); new_test_raft_with_config(&cfg, store) }; - r.load_state(&hard_state(0, 1, 0)); + r.load_state(&hard_state(1, 1, 0)); r.become_follower(2, 2); let mut m = new_message(2, 1, MessageType::MsgAppend, 0); @@ -732,14 +734,14 @@ fn test_follower_append_entries() { ), ( 1, - 0, + 1, vec![empty_entry(1, 2)], vec![empty_entry(1, 2), empty_entry(2, 3)], vec![], ), ( 1, - 0, + 1, vec![empty_entry(3, 2)], vec![empty_entry(3, 2)], vec![empty_entry(3, 2)], @@ -913,7 +915,7 @@ fn test_vote_request() { let mut r = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage()); let mut m = new_message(2, 1, MessageType::MsgAppend, 0); m.set_term(wterm - 1); - m.set_log_term(0); + m.set_log_term(1); // log-term must be greater than 0. m.set_index(1); m.set_entries(ents.clone()); r.step(m).expect(""); diff --git a/tests/integration_cases/test_raft_snap.rs b/tests/integration_cases/test_raft_snap.rs index ab5c5329f..aea94ef7a 100644 --- a/tests/integration_cases/test_raft_snap.rs +++ b/tests/integration_cases/test_raft_snap.rs @@ -26,7 +26,7 @@ // limitations under the License. use crate::test_util::*; -use harness::setup_for_test; +use harness::{setup_for_test, Network}; use raft::eraftpb::*; fn testing_snap() -> Snapshot { @@ -134,3 +134,20 @@ fn test_snapshot_abort() { assert_eq!(sm.prs().get(2).unwrap().pending_snapshot, 0); assert_eq!(sm.prs().get(2).unwrap().next_idx, 12); } + +// Initialized storage should be at term 1 instead of 0. Otherwise the case will fail. +#[test] +fn test_snapshot_with_min_term() { + setup_for_test(); + let do_test = |pre_vote: bool| { + let n1 = new_test_raft_with_prevote(1, vec![1, 2], 10, 1, new_storage(), pre_vote); + let n2 = new_test_raft_with_prevote(2, vec![], 10, 1, new_storage(), pre_vote); + let mut nt = Network::new(vec![Some(n1), Some(n2)]); + nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + // 1 will be elected as leader, and then send a snapshot and an empty entry to 2. + assert_eq!(nt.peers[&2].raft_log.first_index(), 2); + assert_eq!(nt.peers[&2].raft_log.last_index(), 2); + }; + do_test(true); + do_test(false); +} diff --git a/tests/integration_cases/test_raw_node.rs b/tests/integration_cases/test_raw_node.rs index 18521fa63..d519d6092 100644 --- a/tests/integration_cases/test_raw_node.rs +++ b/tests/integration_cases/test_raw_node.rs @@ -85,6 +85,9 @@ fn test_raw_node_step() { // Vote messages with term 0 will cause panics. MessageType::MsgRequestVote, MessageType::MsgRequestPreVote, + // MsgAppend and MsgSnapshot with log term 0 will cause test code panics. + MessageType::MsgAppend, + MessageType::MsgSnapshot, ] .contains(&msg_t) { @@ -369,9 +372,9 @@ fn test_raw_node_start() { must_cmp_ready( &rd, &None, - &Some(hard_state(1, 3, 1)), - &[new_entry(1, 3, Some("foo"))], - vec![new_entry(1, 3, Some("foo"))], + &Some(hard_state(2, 3, 1)), + &[new_entry(2, 3, Some("foo"))], + vec![new_entry(2, 3, Some("foo"))], false, ); store.wl().append(rd.entries()).expect("");