Skip to content

Commit

Permalink
Remote sequencer now handle cancelled/gone sequencers
Browse files Browse the repository at this point in the history
This also adds a little bit more stress to the tests to help them fail more often if there is an issue
  • Loading branch information
AhmedSoliman committed Feb 8, 2025
1 parent c00c4a6 commit 641c540
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 73 deletions.
4 changes: 2 additions & 2 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ impl Appender {
};
match loglet.append_batch(batch.clone()).await {
Ok(lsn) => return Ok(lsn),
Err(AppendError::Sealed) => {
Err(err @ AppendError::Sealed | err @ AppendError::ReconfigurationNeeded(_)) => {
debug!(
log_id = %self.log_id,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Batch append failed but will be retried (loglet has been sealed). Waiting for reconfiguration to complete"
"Batch append failed but will be retried ({err}). Waiting for reconfiguration to complete"
);
let new_loglet = Self::on_sealed_loglet(
self.log_id,
Expand Down
11 changes: 10 additions & 1 deletion crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use provider::{LogletProvider, LogletProviderFactory};
use restate_types::logs::metadata::ProviderKind;
use tokio::sync::oneshot;

use std::borrow::Cow;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Poll};
Expand Down Expand Up @@ -190,6 +191,12 @@ impl LogletCommit {
Self { rx }
}

pub fn reconfiguration_needed(reason: impl Into<Cow<'static, str>>) -> Self {
let (tx, rx) = oneshot::channel();
let _ = tx.send(Err(AppendError::ReconfigurationNeeded(reason.into())));
Self { rx }
}

pub fn resolved(offset: LogletOffset) -> Self {
let (tx, rx) = oneshot::channel();
let _ = tx.send(Ok(offset));
Expand All @@ -211,7 +218,9 @@ impl std::future::Future for LogletCommit {
) -> Poll<Self::Output> {
match ready!(self.rx.poll_unpin(cx)) {
Ok(res) => Poll::Ready(res),
Err(_) => Poll::Ready(Err(AppendError::Shutdown(ShutdownError))),
Err(_) => Poll::Ready(Err(AppendError::ReconfigurationNeeded(
"loglet gave up on this batch".into(),
))),
}
}
}
5 changes: 4 additions & 1 deletion crates/bifrost/src/loglet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::Arc;

Expand All @@ -16,8 +17,10 @@ use restate_types::errors::{IntoMaybeRetryable, MaybeRetryableError};

#[derive(Debug, Clone, thiserror::Error)]
pub enum AppendError {
#[error("Loglet is sealed")]
#[error("Loglet has been sealed")]
Sealed,
#[error("Loglet needs reconfiguration; {0}")]
ReconfigurationNeeded(Cow<'static, str>),
#[error(transparent)]
Shutdown(#[from] ShutdownError),
#[error(transparent)]
Expand Down
4 changes: 4 additions & 0 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest
println!("append failed({i}) => SEALED");
break;
}
Err(AppendError::ReconfigurationNeeded(reason)) => {
println!("append failed({i}) => ReconfigurationNeeded({reason})");
break;
}
Err(AppendError::Shutdown(_)) => {
break;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl<T: TransportConnect> ReplicatedLoglet<T> {
}
CheckSealOutcome::FullySealed => {
// already fully sealed, just make sure the sequencer is drained.
handle.drain().await?;
handle.drain().await;
// note that we can only do that if we are the sequencer because
// our known_global_tail is authoritative. We have no doubt about
// whether the tail needs to be repaired or not.
Expand Down Expand Up @@ -420,7 +420,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
.await?;
// If we are the sequencer, we need to wait until the sequencer is drained.
if let SequencerAccess::Local { handle } = &self.sequencer {
handle.drain().await?;
handle.drain().await;
self.known_global_tail.notify_seal();
};
// Primarily useful for remote sequencer to enforce seal check on the next find_tail() call
Expand Down
8 changes: 8 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,14 @@ impl WaitForCommitTask {
},
last_offset: LogletOffset::INVALID,
},
Err(AppendError::ReconfigurationNeeded(_)) => Appended {
header: CommonResponseHeader {
known_global_tail: Some(self.global_tail.latest_offset()),
sealed: Some(self.global_tail.is_sealed()), // this must be true
status: SequencerStatus::Gone,
},
last_offset: LogletOffset::INVALID,
},
Err(AppendError::Shutdown(_)) => Appended {
header: CommonResponseHeader {
known_global_tail: Some(self.global_tail.latest_offset()),
Expand Down
45 changes: 36 additions & 9 deletions crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use restate_types::{
errors::MaybeRetryableError,
logs::{metadata::SegmentIndex, LogId, Record},
net::replicated_loglet::{Append, Appended, CommonRequestHeader, SequencerStatus},
nodes_config::NodesConfigError,
replicated_loglet::ReplicatedLogletParams,
GenerationalNodeId,
};
Expand Down Expand Up @@ -163,8 +164,30 @@ where
| NetworkError::ConnectionClosed(_)
| NetworkError::Timeout(_) => {
// we retry to re-connect one time
connection = self.renew_connection(connection).await?;

connection = match self.renew_connection(connection).await {
Ok(connection) => connection,
// A bit of deep inspection of the error here, ugly, but correct.
// todo (asoli): make this code pretty.
//
// these two cases are the same but they come from different
// sources.
Err(NetworkError::OldPeerGeneration(err)) => {
// means that the sequencer is gone, we need reconfiguration.
return Ok(LogletCommit::reconfiguration_needed(format!(
"sequencer is gone; {err}"
)));
}
Err(NetworkError::UnknownNode(
NodesConfigError::GenerationMismatch { found, .. },
)) if found.is_newer_than(self.params.sequencer) => {
// means that the sequencer is gone, we need reconfiguration.
return Ok(LogletCommit::reconfiguration_needed(format!(
"sequencer is gone; {err}"
)));
}
// probably retryable
Err(err) => return Err(err.into()),
};
msg = err.original;
continue;
}
Expand Down Expand Up @@ -375,11 +398,16 @@ impl RemoteSequencerConnection {
commit_resolver.sealed();
break AppendError::Sealed;
}
SequencerStatus::Gone | SequencerStatus::Shutdown => {
// this sequencer is not coming back
commit_resolver.error(AppendError::ReconfigurationNeeded(
format!("sequencer at {} is terminating", connection.peer()).into(),
));
}
SequencerStatus::UnknownLogId
| SequencerStatus::UnknownSegmentIndex
| SequencerStatus::LogletIdMismatch
| SequencerStatus::NotSequencer
| SequencerStatus::Shutdown
| SequencerStatus::Error { .. } => {
let err = RemoteSequencerError::try_from(appended.header.status).unwrap();
// While the UnknownLoglet status is non-terminal for the connection
Expand Down Expand Up @@ -428,8 +456,6 @@ pub enum RemoteSequencerError {
LogletIdMismatch,
#[error("Remote node is not a sequencer")]
NotSequencer,
#[error("Sequencer shutdown")]
Shutdown,
#[error("Unknown remote error: {message}")]
Error { retryable: bool, message: String },
}
Expand All @@ -441,7 +467,6 @@ impl MaybeRetryableError for RemoteSequencerError {
Self::UnknownSegmentIndex => false,
Self::LogletIdMismatch => false,
Self::NotSequencer => false,
Self::Shutdown => false,
Self::Error { retryable, .. } => *retryable,
}
}
Expand All @@ -455,12 +480,14 @@ impl TryFrom<SequencerStatus> for RemoteSequencerError {
SequencerStatus::UnknownSegmentIndex => RemoteSequencerError::UnknownSegmentIndex,
SequencerStatus::LogletIdMismatch => RemoteSequencerError::LogletIdMismatch,
SequencerStatus::NotSequencer => RemoteSequencerError::NotSequencer,
SequencerStatus::Shutdown => RemoteSequencerError::Shutdown,
SequencerStatus::Error { retryable, message } => {
RemoteSequencerError::Error { retryable, message }
}
SequencerStatus::Ok | SequencerStatus::Sealed => {
return Err("not a failure status");
SequencerStatus::Ok
| SequencerStatus::Sealed
| SequencerStatus::Shutdown
| SequencerStatus::Gone => {
unreachable!("not a failure status")
}
};

Expand Down
6 changes: 2 additions & 4 deletions crates/bifrost/src/providers/replicated_loglet/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl<T: TransportConnect> Sequencer<T> {
/// observed global_tail with is_sealed=true)
///
/// This method is cancellation safe.
pub async fn drain(&self) -> Result<(), ShutdownError> {
pub async fn drain(&self) {
// stop issuing new permits
self.record_permits.close();
// required to allow in_flight.wait() to finish.
Expand All @@ -199,7 +199,7 @@ impl<T: TransportConnect> Sequencer<T> {
// we are assuming here that seal has been already executed on majority of nodes. This is
// important since in_flight.close() doesn't prevent new tasks from being spawned.
if self.sequencer_shared_state.known_global_tail.is_sealed() {
return Ok(());
return;
}

// wait for in-flight tasks to complete before returning
Expand All @@ -214,8 +214,6 @@ impl<T: TransportConnect> Sequencer<T> {
loglet_id = %self.sequencer_shared_state.my_params.loglet_id,
"Sequencer drained",
);

Ok(())
}

fn ensure_enough_permits(&self, required: usize) {
Expand Down
Loading

0 comments on commit 641c540

Please sign in to comment.