Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote sequencer now handle cancelled/gone sequencers #2676

Merged
merged 2 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ impl Appender {
};
match loglet.append_batch(batch.clone()).await {
Ok(lsn) => return Ok(lsn),
Err(AppendError::Sealed) => {
Err(err @ AppendError::Sealed | err @ AppendError::ReconfigurationNeeded(_)) => {
debug!(
log_id = %self.log_id,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Batch append failed but will be retried (loglet has been sealed). Waiting for reconfiguration to complete"
"Batch append failed but will be retried ({err}). Waiting for reconfiguration to complete"
);
let new_loglet = Self::on_sealed_loglet(
self.log_id,
Expand Down
18 changes: 13 additions & 5 deletions crates/bifrost/src/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ pub mod util;

// exports
pub use error::*;
use futures::stream::BoxStream;
pub use provider::{LogletProvider, LogletProviderFactory};
use restate_types::logs::metadata::ProviderKind;
use tokio::sync::oneshot;

use std::borrow::Cow;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Poll};

use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{FutureExt, Stream};
use tokio::sync::oneshot;

use restate_core::ShutdownError;
use restate_types::logs::metadata::ProviderKind;
use restate_types::logs::{KeyFilter, LogletId, LogletOffset, Record, TailState};

use crate::LogEntry;
Expand Down Expand Up @@ -190,6 +190,12 @@ impl LogletCommit {
Self { rx }
}

pub fn reconfiguration_needed(reason: impl Into<Cow<'static, str>>) -> Self {
let (tx, rx) = oneshot::channel();
let _ = tx.send(Err(AppendError::ReconfigurationNeeded(reason.into())));
Self { rx }
}

pub fn resolved(offset: LogletOffset) -> Self {
let (tx, rx) = oneshot::channel();
let _ = tx.send(Ok(offset));
Expand All @@ -211,7 +217,9 @@ impl std::future::Future for LogletCommit {
) -> Poll<Self::Output> {
match ready!(self.rx.poll_unpin(cx)) {
Ok(res) => Poll::Ready(res),
Err(_) => Poll::Ready(Err(AppendError::Shutdown(ShutdownError))),
Err(_) => Poll::Ready(Err(AppendError::ReconfigurationNeeded(
AhmedSoliman marked this conversation as resolved.
Show resolved Hide resolved
"loglet gave up on this batch".into(),
))),
}
}
}
5 changes: 4 additions & 1 deletion crates/bifrost/src/loglet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::Arc;

Expand All @@ -16,8 +17,10 @@ use restate_types::errors::{IntoMaybeRetryable, MaybeRetryableError};

#[derive(Debug, Clone, thiserror::Error)]
pub enum AppendError {
#[error("Loglet is sealed")]
#[error("Loglet has been sealed")]
Sealed,
#[error("Loglet needs reconfiguration; {0}")]
ReconfigurationNeeded(Cow<'static, str>),
#[error(transparent)]
Shutdown(#[from] ShutdownError),
#[error(transparent)]
Expand Down
4 changes: 4 additions & 0 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest
println!("append failed({i}) => SEALED");
break;
}
Err(AppendError::ReconfigurationNeeded(reason)) => {
println!("append failed({i}) => ReconfigurationNeeded({reason})");
break;
}
Err(AppendError::Shutdown(_)) => {
break;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl<T: TransportConnect> ReplicatedLoglet<T> {
}
CheckSealOutcome::FullySealed => {
// already fully sealed, just make sure the sequencer is drained.
handle.drain().await?;
handle.drain().await;
// note that we can only do that if we are the sequencer because
// our known_global_tail is authoritative. We have no doubt about
// whether the tail needs to be repaired or not.
Expand Down Expand Up @@ -420,7 +420,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
.await?;
// If we are the sequencer, we need to wait until the sequencer is drained.
if let SequencerAccess::Local { handle } = &self.sequencer {
handle.drain().await?;
handle.drain().await;
self.known_global_tail.notify_seal();
};
// Primarily useful for remote sequencer to enforce seal check on the next find_tail() call
Expand Down
8 changes: 8 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,14 @@ impl WaitForCommitTask {
},
last_offset: LogletOffset::INVALID,
},
Err(AppendError::ReconfigurationNeeded(_)) => Appended {
header: CommonResponseHeader {
known_global_tail: Some(self.global_tail.latest_offset()),
sealed: Some(self.global_tail.is_sealed()), // this must be true
status: SequencerStatus::Gone,
},
last_offset: LogletOffset::INVALID,
},
Err(AppendError::Shutdown(_)) => Appended {
header: CommonResponseHeader {
known_global_tail: Some(self.global_tail.latest_offset()),
Expand Down
51 changes: 41 additions & 10 deletions crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use restate_types::{
errors::MaybeRetryableError,
logs::{metadata::SegmentIndex, LogId, Record},
net::replicated_loglet::{Append, Appended, CommonRequestHeader, SequencerStatus},
nodes_config::NodesConfigError,
replicated_loglet::ReplicatedLogletParams,
GenerationalNodeId,
};
Expand Down Expand Up @@ -140,7 +141,10 @@ where
.await
.unwrap();

let mut connection = self.get_connection().await?;
let mut connection = match self.get_connection().await {
Ok(connection) => connection,
Err(err) => return self.on_handle_network_error(err),
};

let mut msg = Append {
header: CommonRequestHeader {
Expand All @@ -163,8 +167,10 @@ where
| NetworkError::ConnectionClosed(_)
| NetworkError::Timeout(_) => {
// we retry to re-connect one time
connection = self.renew_connection(connection).await?;

connection = match self.renew_connection(connection).await {
Ok(connection) => connection,
Err(err) => return self.on_handle_network_error(err),
};
msg = err.original;
continue;
}
Expand All @@ -180,6 +186,27 @@ where
Ok(commit_token)
}

fn on_handle_network_error(&self, err: NetworkError) -> Result<LogletCommit, OperationError> {
match err {
err @ NetworkError::OldPeerGeneration(_) | err @ NetworkError::NodeIsGone(_) => {
// means that the sequencer is gone, we need reconfiguration.
Ok(LogletCommit::reconfiguration_needed(format!(
"sequencer is gone; {err}"
)))
}
NetworkError::UnknownNode(NodesConfigError::GenerationMismatch { found, .. })
if found.is_newer_than(self.params.sequencer) =>
{
// means that the sequencer is gone, we need reconfiguration.
Ok(LogletCommit::reconfiguration_needed(format!(
"sequencer is gone; {err}"
)))
}
// probably retryable
err => Err(err.into()),
}
}

/// Gets or starts a new remote sequencer connection
async fn get_connection(&self) -> Result<RemoteSequencerConnection, NetworkError> {
let mut guard = self.connection.lock().await;
Expand Down Expand Up @@ -375,11 +402,16 @@ impl RemoteSequencerConnection {
commit_resolver.sealed();
break AppendError::Sealed;
}
SequencerStatus::Gone | SequencerStatus::Shutdown => {
// this sequencer is not coming back
commit_resolver.error(AppendError::ReconfigurationNeeded(
format!("sequencer at {} is terminating", connection.peer()).into(),
));
}
SequencerStatus::UnknownLogId
| SequencerStatus::UnknownSegmentIndex
| SequencerStatus::LogletIdMismatch
| SequencerStatus::NotSequencer
| SequencerStatus::Shutdown
| SequencerStatus::Error { .. } => {
let err = RemoteSequencerError::try_from(appended.header.status).unwrap();
// While the UnknownLoglet status is non-terminal for the connection
Expand Down Expand Up @@ -428,8 +460,6 @@ pub enum RemoteSequencerError {
LogletIdMismatch,
#[error("Remote node is not a sequencer")]
NotSequencer,
#[error("Sequencer shutdown")]
Shutdown,
#[error("Unknown remote error: {message}")]
Error { retryable: bool, message: String },
}
Expand All @@ -441,7 +471,6 @@ impl MaybeRetryableError for RemoteSequencerError {
Self::UnknownSegmentIndex => false,
Self::LogletIdMismatch => false,
Self::NotSequencer => false,
Self::Shutdown => false,
Self::Error { retryable, .. } => *retryable,
}
}
Expand All @@ -455,12 +484,14 @@ impl TryFrom<SequencerStatus> for RemoteSequencerError {
SequencerStatus::UnknownSegmentIndex => RemoteSequencerError::UnknownSegmentIndex,
SequencerStatus::LogletIdMismatch => RemoteSequencerError::LogletIdMismatch,
SequencerStatus::NotSequencer => RemoteSequencerError::NotSequencer,
SequencerStatus::Shutdown => RemoteSequencerError::Shutdown,
SequencerStatus::Error { retryable, message } => {
RemoteSequencerError::Error { retryable, message }
}
SequencerStatus::Ok | SequencerStatus::Sealed => {
return Err("not a failure status");
SequencerStatus::Ok
| SequencerStatus::Sealed
| SequencerStatus::Shutdown
| SequencerStatus::Gone => {
unreachable!("not a failure status")
}
};

Expand Down
16 changes: 10 additions & 6 deletions crates/bifrost/src/providers/replicated_loglet/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::{

use crossbeam_utils::CachePadded;
use tokio::sync::Semaphore;
use tokio_util::task::TaskTracker;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::{debug, instrument, trace};

use restate_core::{
Expand Down Expand Up @@ -111,6 +111,8 @@ pub struct Sequencer<T> {
record_permits: Arc<Semaphore>,
in_flight_appends: TaskTracker,
record_cache: RecordCache,
/// this is the parent token for all appenders.
cancellation_token: CancellationToken,
}

impl<T> Sequencer<T> {
Expand Down Expand Up @@ -175,6 +177,7 @@ impl<T: TransportConnect> Sequencer<T> {
record_cache,
max_inflight_records_in_config: AtomicUsize::new(max_in_flight_records_in_config),
in_flight_appends: TaskTracker::default(),
cancellation_token: CancellationToken::default(),
}
}

Expand All @@ -186,16 +189,17 @@ impl<T: TransportConnect> Sequencer<T> {
/// observed global_tail with is_sealed=true)
///
/// This method is cancellation safe.
pub async fn drain(&self) -> Result<(), ShutdownError> {
pub async fn drain(&self) {
// stop issuing new permits
self.record_permits.close();
// required to allow in_flight.wait() to finish.
self.in_flight_appends.close();
self.cancellation_token.cancel();

// we are assuming here that seal has been already executed on majority of nodes. This is
// important since in_flight.close() doesn't prevent new tasks from being spawned.
if self.sequencer_shared_state.known_global_tail.is_sealed() {
return Ok(());
return;
}

// wait for in-flight tasks to complete before returning
Expand All @@ -210,8 +214,6 @@ impl<T: TransportConnect> Sequencer<T> {
loglet_id = %self.sequencer_shared_state.my_params.loglet_id,
"Sequencer drained",
);

Ok(())
}

fn ensure_enough_permits(&self, required: usize) {
Expand Down Expand Up @@ -288,7 +290,9 @@ impl<T: TransportConnect> Sequencer<T> {
commit_resolver,
);

let fut = self.in_flight_appends.track_future(appender.run());
let fut = self
.in_flight_appends
.track_future(appender.run(self.cancellation_token.child_token()));
// Why not managed tasks, because managed tasks are not designed to manage a potentially
// very large number of tasks, they also require a lock acquistion on start and that might
// be a contention point.
Expand Down
Loading
Loading