From 289249ef788cbf5705c2f911174fadded3d72bfb Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 10 Feb 2025 12:10:23 +0000 Subject: [PATCH 1/2] Fix sequencer drain in challenging situations --- .../providers/replicated_loglet/sequencer.rs | 10 +++++++-- .../replicated_loglet/sequencer/appender.rs | 22 ++++++++++++++----- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer.rs index a2118637b..3454cafcb 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer.rs @@ -17,7 +17,7 @@ use std::sync::{ use crossbeam_utils::CachePadded; use tokio::sync::Semaphore; -use tokio_util::task::TaskTracker; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tracing::{debug, instrument, trace}; use restate_core::{ @@ -111,6 +111,8 @@ pub struct Sequencer { record_permits: Arc, in_flight_appends: TaskTracker, record_cache: RecordCache, + /// this is the parent token for all appenders. + cancellation_token: CancellationToken, } impl Sequencer { @@ -175,6 +177,7 @@ impl Sequencer { record_cache, max_inflight_records_in_config: AtomicUsize::new(max_in_flight_records_in_config), in_flight_appends: TaskTracker::default(), + cancellation_token: CancellationToken::default(), } } @@ -191,6 +194,7 @@ impl Sequencer { self.record_permits.close(); // required to allow in_flight.wait() to finish. self.in_flight_appends.close(); + self.cancellation_token.cancel(); // we are assuming here that seal has been already executed on majority of nodes. This is // important since in_flight.close() doesn't prevent new tasks from being spawned. @@ -288,7 +292,9 @@ impl Sequencer { commit_resolver, ); - let fut = self.in_flight_appends.track_future(appender.run()); + let fut = self + .in_flight_appends + .track_future(appender.run(self.cancellation_token.child_token())); // Why not managed tasks, because managed tasks are not designed to manage a potentially // very large number of tasks, they also require a lock acquistion on start and that might // be a contention point. diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs index 5cb4b7600..7f72067a4 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs @@ -12,10 +12,10 @@ use std::{cmp::Ordering, fmt::Display, sync::Arc, time::Duration}; use tokio::time::Instant; use tokio::{sync::OwnedSemaphorePermit, task::JoinSet}; +use tokio_util::sync::CancellationToken; use tracing::{debug, instrument, trace, warn}; use restate_core::{ - cancellation_token, network::{rpc_router::RpcRouter, Incoming, NetworkError, Networking, TransportConnect}, ShutdownError, TaskCenterFutureExt, }; @@ -123,15 +123,13 @@ impl SequencerAppender { otel.name="replicated_loglet::sequencer::appender: run" ) )] - pub async fn run(mut self) { + pub async fn run(mut self, cancellation_token: CancellationToken) { let start = Instant::now(); // initial wave has 0 replicated and 0 gray listed node let mut state = State::Wave { graylist: NodeSet::default(), }; - let cancellation = cancellation_token(); - let retry_policy = self .configuration .live_load() @@ -149,7 +147,17 @@ impl SequencerAppender { State::Done | State::Cancelled | State::Sealed => break state, State::Wave { graylist } => { self.current_wave += 1; - let Some(next_state) = cancellation + // # Why is this cancellation safe? + // Because we don't await any futures inside the join_next() loop, so we are + // confident that have cancelled before resolving the commit token. + // We want to make sure we don't cancel _after_ updating the global offset, *then* reporting Cancelled. + // This is because we don't want appenders after our offset to make progress, + // therefore (potentially) dropping records in the writer prefix. Even if a store was + // fully replicated and we cancelled before updating the tail, that's an acceptable + // and safe result because we didn't acknowledge the append to the writer and from + // their perspective it has failed, and because the global tail was not moved, all + // appends after this one cannot move the global tail as well. + let Some(next_state) = cancellation_token .run_until_cancelled(self.send_wave(graylist)) .await else { @@ -173,7 +181,7 @@ impl SequencerAppender { ); } - if cancellation + if cancellation_token .run_until_cancelled(tokio::time::sleep(delay)) .await .is_none() @@ -279,6 +287,8 @@ impl SequencerAppender { }); } + // NOTE: It's very important to keep this loop cancellation safe. If the appender future + // was cancelled, we don't want to move the global commit offset. while let Some(store_result) = store_tasks.join_next().await { // unlikely to happen, but it's there for completeness if self.sequencer_shared_state.known_global_tail.is_sealed() { From 98dda30c68b162f450a5a3f492f33652267d7fa7 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Mon, 10 Feb 2025 12:10:23 +0000 Subject: [PATCH 2/2] Remote sequencer now handle cancelled/gone sequencers This also adds a little bit more stress to the tests to help them fail more often if there is an issue ``` // intentionally empty ``` --- crates/bifrost/src/appender.rs | 4 +- crates/bifrost/src/loglet.rs | 18 ++- crates/bifrost/src/loglet/error.rs | 5 +- crates/bifrost/src/loglet/loglet_tests.rs | 4 + .../src/providers/replicated_loglet/loglet.rs | 4 +- .../providers/replicated_loglet/network.rs | 8 ++ .../replicated_loglet/remote_sequencer.rs | 51 +++++-- .../providers/replicated_loglet/sequencer.rs | 6 +- .../replicated_loglet/sequencer/appender.rs | 129 +++++++++++------- crates/core/src/network/connection_manager.rs | 4 +- crates/types/src/net/replicated_loglet.rs | 2 + server/tests/replicated_loglet.rs | 13 +- 12 files changed, 166 insertions(+), 82 deletions(-) diff --git a/crates/bifrost/src/appender.rs b/crates/bifrost/src/appender.rs index 9b2801b94..a1c02e724 100644 --- a/crates/bifrost/src/appender.rs +++ b/crates/bifrost/src/appender.rs @@ -110,12 +110,12 @@ impl Appender { }; match loglet.append_batch(batch.clone()).await { Ok(lsn) => return Ok(lsn), - Err(AppendError::Sealed) => { + Err(err @ AppendError::Sealed | err @ AppendError::ReconfigurationNeeded(_)) => { debug!( log_id = %self.log_id, attempt = attempt, segment_index = %loglet.segment_index(), - "Batch append failed but will be retried (loglet has been sealed). Waiting for reconfiguration to complete" + "Batch append failed but will be retried ({err}). Waiting for reconfiguration to complete" ); let new_loglet = Self::on_sealed_loglet( self.log_id, diff --git a/crates/bifrost/src/loglet.rs b/crates/bifrost/src/loglet.rs index 0f3f16032..bf92ea03c 100644 --- a/crates/bifrost/src/loglet.rs +++ b/crates/bifrost/src/loglet.rs @@ -16,19 +16,19 @@ pub mod util; // exports pub use error::*; -use futures::stream::BoxStream; pub use provider::{LogletProvider, LogletProviderFactory}; -use restate_types::logs::metadata::ProviderKind; -use tokio::sync::oneshot; +use std::borrow::Cow; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Poll}; use async_trait::async_trait; +use futures::stream::BoxStream; use futures::{FutureExt, Stream}; +use tokio::sync::oneshot; -use restate_core::ShutdownError; +use restate_types::logs::metadata::ProviderKind; use restate_types::logs::{KeyFilter, LogletId, LogletOffset, Record, TailState}; use crate::LogEntry; @@ -190,6 +190,12 @@ impl LogletCommit { Self { rx } } + pub fn reconfiguration_needed(reason: impl Into>) -> Self { + let (tx, rx) = oneshot::channel(); + let _ = tx.send(Err(AppendError::ReconfigurationNeeded(reason.into()))); + Self { rx } + } + pub fn resolved(offset: LogletOffset) -> Self { let (tx, rx) = oneshot::channel(); let _ = tx.send(Ok(offset)); @@ -211,7 +217,9 @@ impl std::future::Future for LogletCommit { ) -> Poll { match ready!(self.rx.poll_unpin(cx)) { Ok(res) => Poll::Ready(res), - Err(_) => Poll::Ready(Err(AppendError::Shutdown(ShutdownError))), + Err(_) => Poll::Ready(Err(AppendError::ReconfigurationNeeded( + "loglet gave up on this batch".into(), + ))), } } } diff --git a/crates/bifrost/src/loglet/error.rs b/crates/bifrost/src/loglet/error.rs index d70b20fe7..da0c3e0a0 100644 --- a/crates/bifrost/src/loglet/error.rs +++ b/crates/bifrost/src/loglet/error.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::borrow::Cow; use std::fmt::Debug; use std::sync::Arc; @@ -16,8 +17,10 @@ use restate_types::errors::{IntoMaybeRetryable, MaybeRetryableError}; #[derive(Debug, Clone, thiserror::Error)] pub enum AppendError { - #[error("Loglet is sealed")] + #[error("Loglet has been sealed")] Sealed, + #[error("Loglet needs reconfiguration; {0}")] + ReconfigurationNeeded(Cow<'static, str>), #[error(transparent)] Shutdown(#[from] ShutdownError), #[error(transparent)] diff --git a/crates/bifrost/src/loglet/loglet_tests.rs b/crates/bifrost/src/loglet/loglet_tests.rs index 1f9486c92..b43b77b97 100644 --- a/crates/bifrost/src/loglet/loglet_tests.rs +++ b/crates/bifrost/src/loglet/loglet_tests.rs @@ -510,6 +510,10 @@ pub async fn append_after_seal_concurrent(loglet: Arc) -> googletest println!("append failed({i}) => SEALED"); break; } + Err(AppendError::ReconfigurationNeeded(reason)) => { + println!("append failed({i}) => ReconfigurationNeeded({reason})"); + break; + } Err(AppendError::Shutdown(_)) => { break; } diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index f12edb5ae..99a90f893 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -219,7 +219,7 @@ impl ReplicatedLoglet { } CheckSealOutcome::FullySealed => { // already fully sealed, just make sure the sequencer is drained. - handle.drain().await?; + handle.drain().await; // note that we can only do that if we are the sequencer because // our known_global_tail is authoritative. We have no doubt about // whether the tail needs to be repaired or not. @@ -420,7 +420,7 @@ impl Loglet for ReplicatedLoglet { .await?; // If we are the sequencer, we need to wait until the sequencer is drained. if let SequencerAccess::Local { handle } = &self.sequencer { - handle.drain().await?; + handle.drain().await; self.known_global_tail.notify_seal(); }; // Primarily useful for remote sequencer to enforce seal check on the next find_tail() call diff --git a/crates/bifrost/src/providers/replicated_loglet/network.rs b/crates/bifrost/src/providers/replicated_loglet/network.rs index 335b9a1d1..f21a273ab 100644 --- a/crates/bifrost/src/providers/replicated_loglet/network.rs +++ b/crates/bifrost/src/providers/replicated_loglet/network.rs @@ -395,6 +395,14 @@ impl WaitForCommitTask { }, last_offset: LogletOffset::INVALID, }, + Err(AppendError::ReconfigurationNeeded(_)) => Appended { + header: CommonResponseHeader { + known_global_tail: Some(self.global_tail.latest_offset()), + sealed: Some(self.global_tail.is_sealed()), // this must be true + status: SequencerStatus::Gone, + }, + last_offset: LogletOffset::INVALID, + }, Err(AppendError::Shutdown(_)) => Appended { header: CommonResponseHeader { known_global_tail: Some(self.global_tail.latest_offset()), diff --git a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs index 1101fbdc0..13d04e9b3 100644 --- a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs @@ -30,6 +30,7 @@ use restate_types::{ errors::MaybeRetryableError, logs::{metadata::SegmentIndex, LogId, Record}, net::replicated_loglet::{Append, Appended, CommonRequestHeader, SequencerStatus}, + nodes_config::NodesConfigError, replicated_loglet::ReplicatedLogletParams, GenerationalNodeId, }; @@ -140,7 +141,10 @@ where .await .unwrap(); - let mut connection = self.get_connection().await?; + let mut connection = match self.get_connection().await { + Ok(connection) => connection, + Err(err) => return self.on_handle_network_error(err), + }; let mut msg = Append { header: CommonRequestHeader { @@ -163,8 +167,10 @@ where | NetworkError::ConnectionClosed(_) | NetworkError::Timeout(_) => { // we retry to re-connect one time - connection = self.renew_connection(connection).await?; - + connection = match self.renew_connection(connection).await { + Ok(connection) => connection, + Err(err) => return self.on_handle_network_error(err), + }; msg = err.original; continue; } @@ -180,6 +186,27 @@ where Ok(commit_token) } + fn on_handle_network_error(&self, err: NetworkError) -> Result { + match err { + err @ NetworkError::OldPeerGeneration(_) | err @ NetworkError::NodeIsGone(_) => { + // means that the sequencer is gone, we need reconfiguration. + Ok(LogletCommit::reconfiguration_needed(format!( + "sequencer is gone; {err}" + ))) + } + NetworkError::UnknownNode(NodesConfigError::GenerationMismatch { found, .. }) + if found.is_newer_than(self.params.sequencer) => + { + // means that the sequencer is gone, we need reconfiguration. + Ok(LogletCommit::reconfiguration_needed(format!( + "sequencer is gone; {err}" + ))) + } + // probably retryable + err => Err(err.into()), + } + } + /// Gets or starts a new remote sequencer connection async fn get_connection(&self) -> Result { let mut guard = self.connection.lock().await; @@ -375,11 +402,16 @@ impl RemoteSequencerConnection { commit_resolver.sealed(); break AppendError::Sealed; } + SequencerStatus::Gone | SequencerStatus::Shutdown => { + // this sequencer is not coming back + commit_resolver.error(AppendError::ReconfigurationNeeded( + format!("sequencer at {} is terminating", connection.peer()).into(), + )); + } SequencerStatus::UnknownLogId | SequencerStatus::UnknownSegmentIndex | SequencerStatus::LogletIdMismatch | SequencerStatus::NotSequencer - | SequencerStatus::Shutdown | SequencerStatus::Error { .. } => { let err = RemoteSequencerError::try_from(appended.header.status).unwrap(); // While the UnknownLoglet status is non-terminal for the connection @@ -428,8 +460,6 @@ pub enum RemoteSequencerError { LogletIdMismatch, #[error("Remote node is not a sequencer")] NotSequencer, - #[error("Sequencer shutdown")] - Shutdown, #[error("Unknown remote error: {message}")] Error { retryable: bool, message: String }, } @@ -441,7 +471,6 @@ impl MaybeRetryableError for RemoteSequencerError { Self::UnknownSegmentIndex => false, Self::LogletIdMismatch => false, Self::NotSequencer => false, - Self::Shutdown => false, Self::Error { retryable, .. } => *retryable, } } @@ -455,12 +484,14 @@ impl TryFrom for RemoteSequencerError { SequencerStatus::UnknownSegmentIndex => RemoteSequencerError::UnknownSegmentIndex, SequencerStatus::LogletIdMismatch => RemoteSequencerError::LogletIdMismatch, SequencerStatus::NotSequencer => RemoteSequencerError::NotSequencer, - SequencerStatus::Shutdown => RemoteSequencerError::Shutdown, SequencerStatus::Error { retryable, message } => { RemoteSequencerError::Error { retryable, message } } - SequencerStatus::Ok | SequencerStatus::Sealed => { - return Err("not a failure status"); + SequencerStatus::Ok + | SequencerStatus::Sealed + | SequencerStatus::Shutdown + | SequencerStatus::Gone => { + unreachable!("not a failure status") } }; diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer.rs index 3454cafcb..3a163bc00 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer.rs @@ -189,7 +189,7 @@ impl Sequencer { /// observed global_tail with is_sealed=true) /// /// This method is cancellation safe. - pub async fn drain(&self) -> Result<(), ShutdownError> { + pub async fn drain(&self) { // stop issuing new permits self.record_permits.close(); // required to allow in_flight.wait() to finish. @@ -199,7 +199,7 @@ impl Sequencer { // we are assuming here that seal has been already executed on majority of nodes. This is // important since in_flight.close() doesn't prevent new tasks from being spawned. if self.sequencer_shared_state.known_global_tail.is_sealed() { - return Ok(()); + return; } // wait for in-flight tasks to complete before returning @@ -214,8 +214,6 @@ impl Sequencer { loglet_id = %self.sequencer_shared_state.my_params.loglet_id, "Sequencer drained", ); - - Ok(()) } fn ensure_enough_permits(&self, required: usize) { diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs index 7f72067a4..385d5668d 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs @@ -17,8 +17,10 @@ use tracing::{debug, instrument, trace, warn}; use restate_core::{ network::{rpc_router::RpcRouter, Incoming, NetworkError, Networking, TransportConnect}, - ShutdownError, TaskCenterFutureExt, + TaskCenterFutureExt, }; +use restate_types::replicated_loglet::Spread; +use restate_types::retries::with_jitter; use restate_types::{ config::Configuration, live::Live, @@ -31,6 +33,7 @@ use restate_types::{ use super::{RecordsExt, SequencerSharedState}; use crate::providers::replicated_loglet::metric_definitions::BIFROST_SEQ_APPEND_DURATION; +use crate::providers::replicated_loglet::replication::spread_selector::SpreadSelectorError; use crate::{ loglet::{AppendError, LogletCommitResolver}, providers::replicated_loglet::{ @@ -46,10 +49,7 @@ const DEFAULT_BACKOFF_TIME: Duration = Duration::from_millis(1000); const TONE_ESCALATION_THRESHOLD: usize = 5; enum State { - Wave { - // nodes that should be avoided by the spread selector - graylist: NodeSet, - }, + Wave, Backoff, Done, Sealed, @@ -72,6 +72,8 @@ pub(crate) struct SequencerAppender { permit: Option, commit_resolver: Option, configuration: Live, + // nodes that should be avoided by the spread selector + graylist: NodeSet, } impl SequencerAppender { @@ -109,6 +111,7 @@ impl SequencerAppender { permit: Some(permit), commit_resolver: Some(commit_resolver), configuration: Configuration::updateable(), + graylist: NodeSet::default(), } } @@ -126,9 +129,7 @@ impl SequencerAppender { pub async fn run(mut self, cancellation_token: CancellationToken) { let start = Instant::now(); // initial wave has 0 replicated and 0 gray listed node - let mut state = State::Wave { - graylist: NodeSet::default(), - }; + let mut state = State::Wave; let retry_policy = self .configuration @@ -145,7 +146,7 @@ impl SequencerAppender { state = match state { // termination conditions State::Done | State::Cancelled | State::Sealed => break state, - State::Wave { graylist } => { + State::Wave => { self.current_wave += 1; // # Why is this cancellation safe? // Because we don't await any futures inside the join_next() loop, so we are @@ -158,7 +159,7 @@ impl SequencerAppender { // their perspective it has failed, and because the global tail was not moved, all // appends after this one cannot move the global tail as well. let Some(next_state) = cancellation_token - .run_until_cancelled(self.send_wave(graylist)) + .run_until_cancelled(self.send_wave()) .await else { break State::Cancelled; @@ -168,7 +169,9 @@ impl SequencerAppender { State::Backoff => { // since backoff can be None, or run out of iterations, // but appender should never give up we fall back to fixed backoff - let delay = retry.next().unwrap_or(DEFAULT_BACKOFF_TIME); + let delay = retry + .next() + .unwrap_or(with_jitter(DEFAULT_BACKOFF_TIME, 0.5)); if self.current_wave >= TONE_ESCALATION_THRESHOLD { warn!( wave = %self.current_wave, @@ -189,9 +192,7 @@ impl SequencerAppender { break State::Cancelled; }; - State::Wave { - graylist: NodeSet::default(), - } + State::Wave } } }; @@ -216,7 +217,9 @@ impl SequencerAppender { State::Cancelled => { trace!("Append cancelled"); if let Some(commit_resolver) = self.commit_resolver.take() { - commit_resolver.error(AppendError::Shutdown(ShutdownError)); + commit_resolver.error(AppendError::ReconfigurationNeeded( + "sequencer is draining".into(), + )); } } State::Sealed => { @@ -231,26 +234,55 @@ impl SequencerAppender { } } + fn reset_graylist(&mut self) { + self.graylist.clear(); + // add back the sealed nodes to the gray list, those will never be writeable again. + self.graylist.extend( + self.checker + .filter(|attr| attr.sealed) + .map(|(node_id, _)| *node_id), + ); + } + + fn generate_spread(&mut self) -> Result { + let rng = &mut rand::rng(); + let nodes_config = &self.networking.metadata().nodes_config_ref(); + match self + .sequencer_shared_state + .selector + .select(rng, nodes_config, &self.graylist) + { + Ok(spread) => Ok(spread), + Err(err) => { + trace!( + nodeset_status = %self.nodeset_status, + graylist = %self.graylist, + %err, + "Cannot select a spread, perhaps too many nodes are graylisted, will clear the list and try again" + ); + self.reset_graylist(); + self.sequencer_shared_state + .selector + .select(rng, nodes_config, &self.graylist) + } + } + } + #[instrument(skip_all, fields(wave = %self.current_wave))] - async fn send_wave(&mut self, mut graylist: NodeSet) -> State { + async fn send_wave(&mut self) -> State { // select the spread - let spread = match self.sequencer_shared_state.selector.select( - &mut rand::rng(), - &self.networking.metadata().nodes_config_ref(), - &graylist, - ) { + let spread = match self.generate_spread() { Ok(spread) => spread, - Err(_) => { - graylist.clear(); + Err(err) => { trace!( - %graylist, - "Cannot select a spread, perhaps too many nodes are graylisted, will clear the list and try again" + nodeset_status = %self.nodeset_status, + "Cannot select a spread: {err}" ); return State::Backoff; } }; - trace!(%graylist, %spread, "Sending append wave"); + trace!(graylist = %self.graylist, %spread, wave = %self.current_wave, nodeset_status = %self.nodeset_status, "Sending append wave"); let last_offset = self.records.last_offset(self.first_offset).unwrap(); // todo: should be exponential backoff @@ -273,18 +305,22 @@ impl SequencerAppender { continue; } } - store_tasks.spawn({ - let store_task = LogServerStoreTask { - node_id, - sequencer_shared_state: self.sequencer_shared_state.clone(), - networking: self.networking.clone(), - first_offset: self.first_offset, - records: self.records.clone(), - rpc_router: self.store_router.clone(), - store_timeout, - }; - async move { (node_id, store_task.run().await) }.in_current_tc() - }); + store_tasks + .build_task() + .name(&format!("store-to-{}", node_id)) + .spawn({ + let store_task = LogServerStoreTask { + node_id, + sequencer_shared_state: self.sequencer_shared_state.clone(), + networking: self.networking.clone(), + first_offset: self.first_offset, + records: self.records.clone(), + rpc_router: self.store_router.clone(), + store_timeout, + }; + async move { (node_id, store_task.run().await) }.in_current_tc() + }) + .unwrap(); } // NOTE: It's very important to keep this loop cancellation safe. If the appender future @@ -313,7 +349,7 @@ impl SequencerAppender { trace!(peer = %node_id, "Timeout waiting for node {} to commit a batch", node_id); } self.nodeset_status.merge(node_id, PerNodeStatus::timeout()); - graylist.insert(node_id); + self.graylist.insert(node_id); continue; } StoreTaskStatus::Error(err) => { @@ -324,7 +360,7 @@ impl SequencerAppender { trace!(peer = %node_id, %err, "Failed to send batch to node"); } self.nodeset_status.merge(node_id, PerNodeStatus::failed()); - graylist.insert(node_id); + self.graylist.insert(node_id); continue; } StoreTaskStatus::Sealed => { @@ -353,20 +389,20 @@ impl SequencerAppender { Status::Sealed | Status::Sealing => { self.checker .set_attribute(node_id, NodeAttributes::sealed()); - graylist.insert(node_id); + self.graylist.insert(node_id); } Status::Dropped => { // Overloaded, or request expired debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer is load shedding"); - graylist.insert(node_id); + self.graylist.insert(node_id); } Status::Disabled => { debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer's log-store is disabled"); - graylist.insert(node_id); + self.graylist.insert(node_id); } Status::SequencerMismatch | Status::Malformed | Status::OutOfBounds => { warn!(peer = %node_id, status=?stored.status, "Store failed on peer due to unexpected error, please check logs of the peer to investigate"); - graylist.insert(node_id); + self.graylist.insert(node_id); } } @@ -387,10 +423,7 @@ impl SequencerAppender { if self.checker.check_fmajority(|attr| attr.sealed).passed() { State::Sealed } else { - // We couldn't achieve write quorum with this wave. We will try again, as fast as - // possible until the graylist eats up enough nodes such that we won't be able to - // generate node nodesets. Only then we backoff. - State::Wave { graylist } + State::Backoff } } } diff --git a/crates/core/src/network/connection_manager.rs b/crates/core/src/network/connection_manager.rs index 0537cdc74..e775cb0dc 100644 --- a/crates/core/src/network/connection_manager.rs +++ b/crates/core/src/network/connection_manager.rs @@ -638,8 +638,8 @@ where if let message::Body::ConnectionControl(ctrl_msg) = &body { // do something info!( - "Terminating connection based on signal from peer: {:?} {}", - ctrl_msg.signal(), + "Terminating connection based on signal from {}: {}", + connection.peer(), ctrl_msg.message ); if ctrl_msg.signal() == message::Signal::Shutdown { diff --git a/crates/types/src/net/replicated_loglet.rs b/crates/types/src/net/replicated_loglet.rs index 35e637270..68a15d3dd 100644 --- a/crates/types/src/net/replicated_loglet.rs +++ b/crates/types/src/net/replicated_loglet.rs @@ -44,6 +44,8 @@ pub enum SequencerStatus { /// Sealed is returned when the sequencer cannot accept more /// [`Append`] requests because it's sealed Sealed, + /// Local sequencer is not available anymore, reconfiguration is needed + Gone, /// LogletID does not match Segment LogletIdMismatch, /// Invalid LogId diff --git a/server/tests/replicated_loglet.rs b/server/tests/replicated_loglet.rs index 38cc5e21b..30e238ffc 100644 --- a/server/tests/replicated_loglet.rs +++ b/server/tests/replicated_loglet.rs @@ -22,9 +22,11 @@ mod tests { use futures_util::StreamExt; use googletest::prelude::*; use restate_bifrost::{loglet::AppendError, ErrorRecoveryStrategy}; - use restate_core::{cancellation_token, Metadata, TaskCenterFutureExt}; use test_log::test; + use tokio::task::{JoinHandle, JoinSet}; + use tokio_util::sync::CancellationToken; + use restate_core::{Metadata, TaskCenterFutureExt}; use restate_types::{ config::Configuration, logs::{ @@ -37,8 +39,6 @@ mod tests { time::NanosSinceEpoch, GenerationalNodeId, Version, }; - use tokio::task::{JoinHandle, JoinSet}; - use tokio_util::sync::CancellationToken; use super::common::replicated_loglet::run_in_test_env; @@ -209,7 +209,7 @@ mod tests { async fn bifrost_append_and_seal_concurrent() -> googletest::Result<()> { const TEST_DURATION: Duration = Duration::from_secs(10); const SEAL_PERIOD: Duration = Duration::from_secs(1); - const CONCURRENT_APPENDERS: usize = 20; + const CONCURRENT_APPENDERS: usize = 400; run_in_test_env( Configuration::default(), @@ -250,15 +250,13 @@ mod tests { let mut sealer_handle: JoinHandle> = tokio::task::spawn({ let bifrost = test_env.bifrost.clone(); - async move { - let cancellation_token = cancellation_token(); let mut chain = metadata.updateable_logs_metadata().map(|logs| logs.chain(&log_id).expect("a chain to exist")); let mut last_loglet_id = None; - while !cancellation_token.is_cancelled() { + loop { tokio::time::sleep(SEAL_PERIOD).await; let mut params = ReplicatedLogletParams::deserialize_from( @@ -282,7 +280,6 @@ mod tests { ) .await?; } - Ok(()) }.in_current_tc() });