Skip to content

Commit

Permalink
fix a bug about initialized storage's term (tikv#214)
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored Apr 12, 2019
1 parent fc6c059 commit 2ebbed5
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 183 deletions.
26 changes: 24 additions & 2 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ fn main() {
add_all_followers(proposals.as_ref());

// Put 100 key-value pairs.
println!("We get a 5 nodes Raft cluster now, now propose 100 proposals");
(0..100u16)
.filter(|i| {
let (proposal, rx) = Proposal::normal(*i, "hello, world".to_owned());
Expand All @@ -113,6 +114,9 @@ fn main() {
})
.count();

println!("Propose 100 proposals success!");

// FIXME: the program will be blocked here forever. Need to exit gracefully.
for th in handles {
th.join().unwrap();
}
Expand Down Expand Up @@ -197,16 +201,27 @@ fn on_ready(
if !raft_group.has_ready() {
return;
}
let store = raft_group.raft.raft_log.store.clone();

// Get the `Ready` with `RawNode::ready` interface.
let mut ready = raft_group.ready();

// Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize
// raft logs to the latest position.
if let Err(e) = raft_group.raft.raft_log.store.wl().append(ready.entries()) {
if let Err(e) = store.wl().append(ready.entries()) {
error!("persist raft log fail: {:?}, need to retry or panic", e);
return;
}

// Apply the snapshot. It's necessary because in `RawNode::advance` we stabilize the snapshot.
if *ready.snapshot() != Snapshot::new_() {
let s = ready.snapshot().clone();
if let Err(e) = store.wl().apply_snapshot(s) {
error!("apply snapshot fail: {:?}, need to retry or panic", e);
return;
}
}

// Send out the messages come from the node.
for msg in ready.messages.drain(..) {
let to = msg.get_to();
Expand All @@ -217,7 +232,7 @@ fn on_ready(

// Apply all committed proposals.
if let Some(committed_entries) = ready.committed_entries.take() {
for entry in committed_entries {
for entry in &committed_entries {
if entry.get_data().is_empty() {
// From new elected leaders.
continue;
Expand All @@ -234,6 +249,8 @@ fn on_ready(
ConfChangeType::BeginMembershipChange
| ConfChangeType::FinalizeMembershipChange => unimplemented!(),
}
let cs = ConfState::from(raft_group.raft.prs().configuration().clone());
store.wl().set_conf_state(cs, None);
} else {
// For normal proposals, extract the key-value pair and then
// insert them into the kv engine.
Expand All @@ -250,6 +267,11 @@ fn on_ready(
proposal.propose_success.send(true).unwrap();
}
}
if let Some(last_committed) = committed_entries.last() {
let mut s = store.wl();
s.mut_hard_state().set_commit(last_committed.get_index());
s.mut_hard_state().set_term(last_committed.get_term());
}
}
// Call `RawNode::advance` interface to update position flags in the raft.
raft_group.advance(ready);
Expand Down
26 changes: 1 addition & 25 deletions harness/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use raft::{eraftpb::Message, storage::MemStorage, Progress, ProgressSet, Raft, Result};
use raft::{eraftpb::Message, storage::MemStorage, Raft, Result};
use std::ops::{Deref, DerefMut};

/// A simulated Raft façade for testing.
Expand Down Expand Up @@ -63,30 +63,6 @@ impl Interface {
None => vec![],
}
}

/// Initialize a raft with the given ID and peer set.
pub fn initial(&mut self, id: u64, ids: &[u64]) {
if self.raft.is_some() {
self.id = id;
let prs = self.take_prs();
self.set_prs(ProgressSet::with_capacity(
ids.len(),
prs.learner_ids().len(),
));
for id in ids {
let progress = Progress::new(0, 256);
if prs.learner_ids().contains(id) {
if let Err(e) = self.mut_prs().insert_learner(*id, progress) {
panic!("{}", e);
}
} else if let Err(e) = self.mut_prs().insert_voter(*id, progress) {
panic!("{}", e);
}
}
let term = self.term;
self.reset(term);
}
}
}

impl From<Option<Raft<MemStorage>>> for Interface {
Expand Down
56 changes: 31 additions & 25 deletions harness/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,45 +61,51 @@ pub struct Network {
}

impl Network {
/// Initializes a network from peers.
/// Get a base config. Calling `Network::new` will initialize peers with this config.
pub fn default_config() -> Config {
Config {
election_tick: 10,
heartbeat_tick: 1,
max_size_per_msg: NO_LIMIT,
max_inflight_msgs: 256,
..Default::default()
}
}

/// Initializes a network from `peers`.
///
/// Nodes will recieve their ID based on their index in the vector, starting with 1.
///
/// A `None` node will be replaced with a new Raft node.
/// A `None` node will be replaced with a new Raft node, and its configuration will
/// be `peers`.
pub fn new(peers: Vec<Option<Interface>>) -> Network {
Network::new_with_config(peers, false)
let config = Network::default_config();
Network::new_with_config(peers, &config)
}

/// Explicitly set the pre_vote option on newly created rafts.
///
/// **TODO:** Make this accept any config.
pub fn new_with_config(mut peers: Vec<Option<Interface>>, pre_vote: bool) -> Network {
let size = peers.len();
let peer_addrs: Vec<u64> = (1..=size as u64).collect();
/// Initialize a network from `peers` with explicitly specified `config`.
pub fn new_with_config(mut peers: Vec<Option<Interface>>, config: &Config) -> Network {
let mut nstorage = HashMap::new();
let mut npeers = HashMap::new();
for (p, id) in peers.drain(..).zip(peer_addrs.clone()) {

let peer_addrs: Vec<u64> = (1..=peers.len() as u64).collect();
for (p, id) in peers.drain(..).zip(&peer_addrs) {
match p {
None => {
let conf_state = ConfState::from((peer_addrs.clone(), vec![]));
let store = MemStorage::new_with_conf_state(conf_state);
nstorage.insert(id, store.clone());
let config = Config {
id,
election_tick: 10,
heartbeat_tick: 1,
max_size_per_msg: NO_LIMIT,
max_inflight_msgs: 256,
pre_vote,
tag: format!("{}", id),
..Default::default()
};
nstorage.insert(*id, store.clone());
let mut config = config.clone();
config.id = *id;
config.tag = format!("{}", id);
let r = Raft::new(&config, store).unwrap().into();
npeers.insert(id, r);
npeers.insert(*id, r);
}
Some(mut p) => {
p.initial(id, &peer_addrs);
npeers.insert(id, p);
Some(r) => {
if r.raft.as_ref().map_or(false, |r| r.id != *id) {
panic!("peer {} in peers has a wrong position", r.id);
}
npeers.insert(*id, r);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use super::{
};

/// Config contains the parameters to start a raft.
#[derive(Clone)]
pub struct Config {
/// The identity of the local raft. It cannot be 0, and must be unique in the group.
pub id: u64,
Expand Down
6 changes: 5 additions & 1 deletion src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,6 @@ impl<T: Storage> Raft<T> {
/// message from a peer.
pub fn step(&mut self, m: Message) -> Result<()> {
// Handle the message term, which may result in our stepping down to a follower.

if m.get_term() == 0 {
// local message
} else if m.get_term() > self.term {
Expand Down Expand Up @@ -1165,6 +1164,8 @@ impl<T: Storage> Raft<T> {
}
}
MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => {
debug_assert!(m.get_log_term() != 0, "{:?} log term can't be 0", m);

// We can vote if this is a repeat of a vote we've already cast...
let can_vote = (self.vote == m.get_from()) ||
// ...we haven't voted and we don't think there's a leader yet in this term...
Expand Down Expand Up @@ -1924,6 +1925,8 @@ impl<T: Storage> Raft<T> {
self.send(to_send);
return;
}
debug_assert!(m.get_log_term() != 0, "{:?} log term can't be 0", m);

let mut to_send = Message::new_();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgAppendResponse);
Expand Down Expand Up @@ -1968,6 +1971,7 @@ impl<T: Storage> Raft<T> {
}

fn handle_snapshot(&mut self, mut m: Message) {
debug_assert!(m.get_term() != 0, "{:?} term can't be 0", m);
let (sindex, sterm) = (
m.get_snapshot().get_metadata().get_index(),
m.get_snapshot().get_metadata().get_term(),
Expand Down
14 changes: 13 additions & 1 deletion src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ impl RaftState {
pub trait Storage {
/// `initial_state` is called when Raft is initialized. This interface will return a `RaftState`
/// which contains `HardState` and `ConfState`.
///
/// `RaftState` could be initialized or not. If it's initialized it means the `Storage` is
/// created with a configuration, and its last index and term should be greater than 0.
fn initial_state(&self) -> Result<RaftState>;

/// Returns a slice of log entries in the range `[low, high)`.
Expand Down Expand Up @@ -156,6 +159,11 @@ impl MemStorageCore {
&self.raft_state.hard_state
}

/// Get the mut hard state.
pub fn mut_hard_state(&mut self) -> &mut HardState {
&mut self.raft_state.hard_state
}

/// Commit to an index.
///
/// # Panics
Expand Down Expand Up @@ -383,13 +391,17 @@ impl MemStorage {
// Set index to 1 to make `first_index` greater than 1 so that there will be a gap between
// uninitialized followers and the leader. And then followers can catch up the initial
// configuration by snapshots.
//
// And, set term to 1 because in term 0 there is no leader exactly.
//
// An another alternative is appending some conf-change entries here to construct the
// initial configuration so that followers can catch up it by raft logs. However the entry
// count depends on how many peers in the initial configuration, which makes some indices
// not predictable. So we choose snapshot instead of raft logs here.
//
core.snapshot_metadata.set_index(1);
core.snapshot_metadata.set_term(1);
core.raft_state.hard_state.set_commit(1);
core.raft_state.hard_state.set_term(1);
core.raft_state.conf_state = ConfState::from(conf_state);
}

Expand Down
Loading

0 comments on commit 2ebbed5

Please sign in to comment.