Skip to content

Commit

Permalink
change RaftStorage: remove apply_entry_to_state_machine
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Aug 23, 2021
1 parent 90329fb commit daf2ed8
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 52 deletions.
9 changes: 7 additions & 2 deletions async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}
// Apply this entry to the state machine and return its data response.
let res = self.core.storage.apply_entry_to_state_machine(entry).await.map_err(|err| {
let res = self.core.storage.replicate_to_state_machine(&[entry]).await.map_err(|err| {
if err.downcast_ref::<S::ShutdownError>().is_some() {
// If this is an instance of the storage impl's shutdown error, then trigger shutdown.
self.core.map_fatal_storage_error(err)
Expand All @@ -454,8 +454,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
RaftError::RaftStorage(err)
}
});

self.core.last_applied = *log_id;
self.leader_report_metrics();
res
let res = res?;

// TODO(xp) merge this function to replication_to_state_machine?

Ok(res.into_iter().next().unwrap())
}
}
20 changes: 6 additions & 14 deletions async-raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,38 +166,30 @@ where
/// Errors returned from this method will cause Raft to go into shutdown.
async fn append_to_log(&self, entries: &[&Entry<D>]) -> Result<()>;

/// Apply the given log entry to the state machine.
/// Apply the given payload of entries to the state machine.
///
/// The Raft protocol guarantees that only logs which have been _committed_, that is, logs which
/// have been replicated to a majority of the cluster, will be applied to the state machine.
///
/// This is where the business logic of interacting with your application's state machine
/// should live. This is 100% application specific. Perhaps this is where an application
/// specific transaction is being started, or perhaps committed. This may be where a key/value
/// is being stored. This may be where an entry is being appended to an immutable log.
/// is being stored.
///
/// An impl should do:
/// - Deal with the EntryPayload::Normal() log, which is business logic log.
/// - Optionally, deal with EntryPayload::ConfigChange or EntryPayload::SnapshotPointer log if they are concerned.
/// E.g. when an impl need to track the membership changing.
/// - Deal with EntryPayload::ConfigChange
/// - A EntryPayload::SnapshotPointer log should never be seen.
///
/// TODO(xp): choose one of the following policy:
/// Error handling for this method is note worthy. If an error is returned from a call to this
/// method, the error will be inspected, and if the error is an instance of
/// `RaftStorage::ShutdownError`, then Raft will go into shutdown in order to preserve the
/// safety of the data and avoid corruption. Any other errors will be propagated back up to the
/// `Raft.client_write` call point.
///
/// It is important to note that even in cases where an application specific error is returned,
/// implementations should still record that the entry has been applied to the state machine.
async fn apply_entry_to_state_machine(&self, data: &Entry<D>) -> Result<R>;

/// Apply the given payload of entries to the state machine, as part of replication.
///
/// The Raft protocol guarantees that only logs which have been _committed_, that is, logs which
/// have been replicated to a majority of the cluster, will be applied to the state machine.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn replicate_to_state_machine(&self, entries: &[&Entry<D>]) -> Result<()>;
async fn replicate_to_state_machine(&self, entries: &[&Entry<D>]) -> Result<Vec<R>>;

/// Perform log compaction, returning a handle to the generated snapshot.
///
Expand Down
45 changes: 10 additions & 35 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,39 +274,11 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
Ok(())
}

#[tracing::instrument(level = "trace", skip(self, entry))]
async fn apply_entry_to_state_machine(&self, entry: &Entry<ClientRequest>) -> Result<ClientResponse> {
let mut sm = self.sm.write().await;

tracing::debug!("id:{} apply to sm index:{}", self.id, entry.log_id.index);
assert_eq!(sm.last_applied_log.index + 1, entry.log_id.index);

sm.last_applied_log = entry.log_id;

return match entry.payload {
EntryPayload::Blank => return Ok(ClientResponse(None)),
EntryPayload::SnapshotPointer(_) => return Ok(ClientResponse(None)),
EntryPayload::Normal(ref norm) => {
let data = &norm.data;
if let Some((serial, res)) = sm.client_serial_responses.get(&data.client) {
if serial == &data.serial {
return Ok(ClientResponse(res.clone()));
}
}
let previous = sm.client_status.insert(data.client.clone(), data.status.clone());
sm.client_serial_responses.insert(data.client.clone(), (data.serial, previous.clone()));
Ok(ClientResponse(previous))
}
EntryPayload::ConfigChange(ref mem) => {
sm.last_membership = Some(mem.membership.clone());
return Ok(ClientResponse(None));
}
};
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn replicate_to_state_machine(&self, entries: &[&Entry<ClientRequest>]) -> Result<()> {
async fn replicate_to_state_machine(&self, entries: &[&Entry<ClientRequest>]) -> Result<Vec<ClientResponse>> {
let mut sm = self.sm.write().await;
let mut res = Vec::with_capacity(entries.len());

for entry in entries {
tracing::debug!("id:{} replicate to sm index:{}", self.id, entry.log_id.index);

Expand All @@ -316,24 +288,27 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
sm.last_applied_log = entry.log_id;

match entry.payload {
EntryPayload::Blank => {}
EntryPayload::SnapshotPointer(_) => {}
EntryPayload::Blank => res.push(ClientResponse(None)),
EntryPayload::SnapshotPointer(_) => res.push(ClientResponse(None)),
EntryPayload::Normal(ref norm) => {
let data = &norm.data;
if let Some((serial, _)) = sm.client_serial_responses.get(&data.client) {
if let Some((serial, r)) = sm.client_serial_responses.get(&data.client) {
if serial == &data.serial {
res.push(ClientResponse(r.clone()));
continue;
}
}
let previous = sm.client_status.insert(data.client.clone(), data.status.clone());
sm.client_serial_responses.insert(data.client.clone(), (data.serial, previous.clone()));
res.push(ClientResponse(previous));
}
EntryPayload::ConfigChange(ref mem) => {
sm.last_membership = Some(mem.membership.clone());
res.push(ClientResponse(None))
}
};
}
Ok(())
Ok(res)
}

#[tracing::instrument(level = "trace", skip(self))]
Expand Down
2 changes: 1 addition & 1 deletion memstore/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ async fn test_apply_entry_to_state_machine() -> Result<()> {
},
}),
};
store.apply_entry_to_state_machine(&entry).await?;
store.replicate_to_state_machine(&[&entry]).await?;
let sm = store.get_state_machine().await;

assert_eq!(
Expand Down

0 comments on commit daf2ed8

Please sign in to comment.