Skip to content

Commit

Permalink
change: RaftStorage: merge append_entry_to_log and replicate_to_log i…
Browse files Browse the repository at this point in the history
…nto one method append_to_log
  • Loading branch information
drmingdrmer committed Aug 23, 2021
1 parent fabf3e7 commit 90329fb
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 23 deletions.
3 changes: 2 additions & 1 deletion async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
};

// Replicate entries to log (same as append, but in follower mode).
self.storage.replicate_to_log(entries).await.map_err(|err| self.map_fatal_storage_error(err))?;
let entry_refs = entries.iter().collect::<Vec<_>>();
self.storage.append_to_log(&entry_refs).await.map_err(|err| self.map_fatal_storage_error(err))?;
if let Some(entry) = entries.last() {
self.last_log_id = entry.log_id;
}
Expand Down
2 changes: 1 addition & 1 deletion async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
};
self.core
.storage
.append_entry_to_log(&entry)
.append_to_log(&[&entry])
.await
.map_err(|err| self.core.map_fatal_storage_error(err))?;
self.core.last_log_id.index = entry.log_id.index;
Expand Down
9 changes: 2 additions & 7 deletions async-raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,13 @@ where
/// Errors returned from this method will cause Raft to go into shutdown.
async fn delete_logs_from(&self, start: u64, stop: Option<u64>) -> Result<()>;

/// Append a new entry to the log.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn append_entry_to_log(&self, entry: &Entry<D>) -> Result<()>;

/// Replicate a payload of entries to the log.
/// Append a payload of entries to the log.
///
/// Though the entries will always be presented in order, each entry's index should be used to
/// determine its location to be written in the log.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn replicate_to_log(&self, entries: &[Entry<D>]) -> Result<()>;
async fn append_to_log(&self, entries: &[&Entry<D>]) -> Result<()>;

/// Apply the given log entry to the state machine.
///
Expand Down
4 changes: 2 additions & 2 deletions async-raft/tests/members_leader_fix_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn members_leader_fix_partial() -> Result<()> {
router.remove_node(0).await;

{
sto.append_entry_to_log(&Entry {
sto.append_to_log(&[&Entry {
log_id: LogId {
term: 1,
index: want + 1,
Expand All @@ -51,7 +51,7 @@ async fn members_leader_fix_partial() -> Result<()> {
members_after_consensus: Some(btreeset! {0,1,2}),
},
}),
})
}])
.await?;
}

Expand Down
11 changes: 2 additions & 9 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,18 +265,11 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
Ok(())
}

#[tracing::instrument(level = "trace", skip(self, entry))]
async fn append_entry_to_log(&self, entry: &Entry<ClientRequest>) -> Result<()> {
let mut log = self.log.write().await;
log.insert(entry.log_id.index, entry.clone());
Ok(())
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn replicate_to_log(&self, entries: &[Entry<ClientRequest>]) -> Result<()> {
async fn append_to_log(&self, entries: &[&Entry<ClientRequest>]) -> Result<()> {
let mut log = self.log.write().await;
for entry in entries {
log.insert(entry.log_id.index, entry.clone());
log.insert(entry.log_id.index, (*entry).clone());
}
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions memstore/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ async fn test_append_entry_to_log() -> Result<()> {
let store = default_store_with_logs();

store
.append_entry_to_log(&Entry {
.append_to_log(&[&Entry {
log_id: (2, 10).into(),
payload: EntryPayload::Blank,
})
}])
.await?;
let l = store.get_log_entries(0, 10_000).await?.len();
let last = store.get_log_entries(0, 10_000).await?.last().unwrap().clone();
Expand All @@ -240,7 +240,7 @@ async fn test_replicate_to_log() -> Result<()> {
let store = default_store_with_logs();

store
.replicate_to_log(&[Entry {
.append_to_log(&[&Entry {
log_id: (1, 11).into(),
payload: EntryPayload::Blank,
}])
Expand Down

0 comments on commit 90329fb

Please sign in to comment.