Skip to content

Commit

Permalink
change: RaftStore::delete_logs_from() use range instead of (start, end)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Aug 28, 2021
1 parent a29b0e5 commit 07d71c6
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 39 deletions.
2 changes: 1 addition & 1 deletion async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// with an index greater than this, then we must delete them per §5.3.
if self.last_log_id.index > target_entry.log_id.index {
self.storage
.delete_logs_from(target_entry.log_id.index + 1, None)
.delete_logs_from(target_entry.log_id.index + 1..)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
let membership =
Expand Down
10 changes: 7 additions & 3 deletions async-raft/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! The Raft storage interface and data types.

use std::error::Error;
use std::fmt::Debug;
use std::ops::RangeBounds;

use anyhow::Result;
use async_trait::async_trait;
Expand Down Expand Up @@ -159,11 +161,13 @@ where
/// Errors returned from this method will cause Raft to go into shutdown.
async fn get_log_entries(&self, start: u64, stop: u64) -> Result<Vec<Entry<D>>>;

/// Delete all logs starting from `start` and stopping at `stop`, else continuing to the end
/// of the log if `stop` is `None`.
/// Delete all logs in a `range`.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn delete_logs_from(&self, start: u64, stop: Option<u64>) -> Result<()>;
async fn delete_logs_from<RNG: RangeBounds<u64> + Clone + Debug + Send + Sync + Iterator>(
&self,
range: RNG,
) -> Result<()>;

/// Append a payload of entries to the log.
///
Expand Down
42 changes: 27 additions & 15 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ mod test;
use std::cmp::max;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::fmt::Debug;
use std::io::Cursor;
use std::ops::RangeBounds;
use std::sync::Arc;
use std::sync::Mutex;

Expand Down Expand Up @@ -305,6 +307,20 @@ impl MemStore {
Ok(())
}

pub async fn defensive_nonempty_range<RT, RNG: RangeBounds<RT> + Clone + Debug + Send + Iterator>(
&self,
range: RNG,
) -> anyhow::Result<()> {
if !*self.defensive.read().await {
return Ok(());
}
for _ in range.clone() {
return Ok(());
}

Err(anyhow::anyhow!("range must be nonempty: {:?}", range))
}

pub async fn defensive_apply_log_id_gt_last<D: AppData>(&self, entries: &[&Entry<D>]) -> anyhow::Result<()> {
if !*self.defensive.read().await {
return Ok(());
Expand Down Expand Up @@ -471,24 +487,20 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
Ok(log.range(start..stop).map(|(_, val)| val.clone()).collect())
}

#[tracing::instrument(level = "trace", skip(self))]
async fn delete_logs_from(&self, start: u64, stop: Option<u64>) -> Result<()> {
// TODO(xp): never delete the last applied log
if stop.as_ref().map(|stop| &start > stop).unwrap_or(false) {
tracing::error!("delete_logs_from: invalid request, start({}) > stop({:?})", start, stop);
return Ok(());
}
#[tracing::instrument(level = "trace", skip(self, range), fields(range=?range))]
async fn delete_logs_from<R: RangeBounds<u64> + Clone + Debug + Send + Sync + Iterator>(
&self,
range: R,
) -> Result<()> {
self.defensive_nonempty_range(range.clone()).await?;

let mut log = self.log.write().await;

// If a stop point was specified, delete from start until the given stop point.
if let Some(stop) = stop.as_ref() {
for key in start..*stop {
log.remove(&key);
}
return Ok(());
let keys = log.range(range).map(|(k, _v)| *k).collect::<Vec<_>>();
for key in keys {
log.remove(&key);
}
// Else, just split off the remainder.
log.split_off(&start);

Ok(())
}

Expand Down
37 changes: 17 additions & 20 deletions memstore/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,23 +507,12 @@ where
}

pub async fn delete_logs_from(builder: &B) -> Result<()> {
tracing::info!("--- delete start > stop");
{
let store = builder.new_store(NODE_ID).await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(10, Some(1)).await?;

let logs = store.get_log_entries(1, 11).await?;
assert_eq!(logs.len(), 10, "expected all (10) logs to be preserved");
}

tracing::info!("--- delete start == stop");
{
let store = builder.new_store(NODE_ID).await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(1, Some(1)).await?;
store.delete_logs_from(1..1).await?;

let logs = store.get_log_entries(1, 11).await?;
assert_eq!(logs.len(), 10, "expected all (10) logs to be preserved");
Expand All @@ -534,7 +523,7 @@ where
let store = builder.new_store(NODE_ID).await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(1, Some(4)).await?;
store.delete_logs_from(1..4).await?;

let logs = store.get_log_entries(0, 100).await?;
assert_eq!(logs.len(), 7);
Expand All @@ -546,7 +535,7 @@ where
let store = builder.new_store(NODE_ID).await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(1, Some(1000)).await?;
store.delete_logs_from(1..1000).await?;
let logs = store.get_log_entries(0, 100).await?;

assert_eq!(logs.len(), 0);
Expand All @@ -557,7 +546,7 @@ where
let store = builder.new_store(NODE_ID).await;
Self::feed_10_logs_vote_self(&store).await?;

store.delete_logs_from(1, None).await?;
store.delete_logs_from(1..).await?;
let logs = store.get_log_entries(0, 100).await?;

assert_eq!(logs.len(), 0);
Expand Down Expand Up @@ -734,7 +723,7 @@ where
run_fut(Suite::df_get_membership_config_dirty_log(builder))?;
run_fut(Suite::df_get_initial_state_dirty_log(builder))?;
run_fut(Suite::df_save_hard_state_ascending(builder))?;
run_fut(Suite::df_delete_logs_from(builder))?;
run_fut(Suite::df_delete_logs_from_nonempty_range(builder))?;
run_fut(Suite::df_append_to_log_nonempty_input(builder))?;
run_fut(Suite::df_append_to_log_nonconsecutive_input(builder))?;
run_fut(Suite::df_append_to_log_eq_last_plus_one(builder))?;
Expand Down Expand Up @@ -930,8 +919,16 @@ where
Ok(())
}

pub async fn df_delete_logs_from(_builder: &B) -> Result<()> {
// TODO(xp): what should we test about this?
pub async fn df_delete_logs_from_nonempty_range(builder: &B) -> Result<()> {
let store = builder.new_store(NODE_ID).await;
Self::feed_10_logs_vote_self(&store).await?;

let res = store.delete_logs_from(10..1).await;
assert!(res.is_err());

let res = store.delete_logs_from(10..10).await;
assert!(res.is_err());

Ok(())
}

Expand Down Expand Up @@ -1039,7 +1036,7 @@ where
])
.await?;

store.delete_logs_from(1, Some(2)).await?;
store.delete_logs_from(1..2).await?;

let res = store
.append_to_log(&[&Entry {
Expand Down Expand Up @@ -1115,7 +1112,7 @@ where
])
.await?;

store.delete_logs_from(1, Some(2)).await?;
store.delete_logs_from(1..2).await?;

let res = store
.append_to_log(&[&Entry {
Expand Down

0 comments on commit 07d71c6

Please sign in to comment.