From e6423d807fb254a87091bac7ac9470eb3deaf25c Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 6 Feb 2025 12:37:07 +0000 Subject: [PATCH] [Bifrost] Sequencer updates Key changes: - Using JoinSet instead of unordered futures to perform store tasks to avoid some of its tricky lockup scenarios - Store tasks are free to use whatever connection is available, this over-indexes over the current design that serializes all writes in a one-at-a-time fashion, but removes a lock, and simplifies the logic. - Extra in-flight stores are aborted after an append is complete to avoid overloading - Timeout store tasks only on the basis of their network operation and not on the time they're waiting for the tail to move - Cache nodeset checker across waves - Better logging on failures, it reports the nodeset status with the number of attempts per node - Adds a new metric to latency of appends Performance testing shows no regression and reliability testing appears to show no regression even in tricky failure scenarios. ``` // intentionally empty ``` --- .../replicated_loglet/log_server_manager.rs | 108 +--- .../replicated_loglet/metric_definitions.rs | 7 + .../replicated_loglet/sequencer/appender.rs | 503 +++++++++++------- .../replicated_loglet/tasks/check_seal.rs | 6 +- 4 files changed, 319 insertions(+), 305 deletions(-) diff --git a/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs b/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs index d8fe561b4..3fca7cf45 100644 --- a/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs +++ b/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs @@ -8,13 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::{collections::BTreeMap, ops::Deref, sync::Arc}; +use std::sync::Arc; -use crossbeam_utils::CachePadded; +use ahash::HashMap; use metrics::Histogram; -use tokio::sync::Mutex; -use restate_core::network::{NetworkError, Networking, TransportConnect, WeakConnection}; use restate_types::{ logs::{LogletOffset, SequenceNumber, TailState}, replication::NodeSet, @@ -24,22 +22,17 @@ use restate_types::{ use super::metric_definitions::BIFROST_SEQ_STORE_DURATION; use crate::loglet::util::TailOffsetWatch; -type LogServerLock = CachePadded>>; - /// LogServer instance #[derive(Clone)] pub struct RemoteLogServer { tail: TailOffsetWatch, - connection: WeakConnection, store_latency: Histogram, } impl RemoteLogServer { - fn new(connection: WeakConnection) -> Self { - let node_id = connection.peer().as_plain(); + fn new(node_id: PlainNodeId) -> Self { Self { tail: TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST)), - connection, store_latency: metrics::histogram!(BIFROST_SEQ_STORE_DURATION, "node_id" => node_id.to_string()), } } @@ -51,20 +44,13 @@ impl RemoteLogServer { pub fn store_latency(&self) -> &Histogram { &self.store_latency } - - pub fn connection(&self) -> &WeakConnection { - &self.connection - } } /// LogServerManager maintains a set of [`RemoteLogServer`]s that provided via the /// [`NodeSet`]. -/// -/// The manager makes sure there is only one active connection per server. -/// It's up to the user of the client to do [`LogServerManager::renew`] if needed #[derive(Clone)] pub struct RemoteLogServerManager { - servers: Arc>, + servers: Arc>, } impl RemoteLogServerManager { @@ -72,94 +58,20 @@ impl RemoteLogServerManager { pub fn new(nodeset: &NodeSet) -> Self { let servers = nodeset .iter() - .map(|node_id| (*node_id, LogServerLock::default())) + .copied() + .map(|node_id| (node_id, RemoteLogServer::new(node_id))) .collect(); let servers = Arc::new(servers); Self { servers } } - pub fn try_get_tail_offset(&self, id: PlainNodeId) -> Option { + pub fn get_tail_offset(&self, id: PlainNodeId) -> &TailOffsetWatch { let server = self.servers.get(&id).expect("node is in nodeset"); - - if let Ok(guard) = server.try_lock() { - if let Some(current) = guard.deref() { - return Some(current.local_tail().clone()); - } - } - - None + server.local_tail() } - /// Gets a log-server instance. On first time it will initialize a new connection - /// to log server. It will make sure all following get call holds the same - /// connection. - /// - /// It's up to the client to call [`Self::renew`] if the connection it holds - /// is closed. - pub async fn get( - &self, - id: PlainNodeId, - networking: &Networking, - ) -> Result { - let server = self.servers.get(&id).expect("node is in nodeset"); - - let mut guard = server.lock().await; - - if let Some(current) = guard.deref() { - return Ok(current.clone()); - } - - let connection = networking.node_connection(id).await?; - let server = RemoteLogServer::new(connection); - - *guard = Some(server.clone()); - - Ok(server) - } - - /// Renew makes sure server connection is renewed if and only if - /// the provided server holds an outdated connection. Otherwise - /// the latest connection associated with this server is used. - /// - /// It's up the holder of the log server instance to retry to renew - /// if that connection is not valid. - /// - /// It also guarantees that concurrent calls to renew on the same server instance - /// will only renew the connection once for all callers - /// - /// However, this does not affect copies of LogServer that have been already retrieved - /// by calling [`Self::get()`]. - /// - /// Holder of old instances will have to call renew if they need to. - pub async fn renew( - &self, - server: &mut RemoteLogServer, - networking: &Networking, - ) -> Result<(), NetworkError> { - // this key must already be in the map - let current = self - .servers - .get(&server.connection.peer().as_plain()) - .expect("node is in nodeset"); - - let mut guard = current.lock().await; - - // if you calling renew then the LogServer has already been initialized - let inner = guard.as_mut().expect("initialized log server instance"); - - if inner.connection != server.connection { - // someone else has already renewed the connection - server.connection = inner.connection.clone(); - return Ok(()); - } - - let connection = networking - .node_connection(server.connection.peer().as_plain()) - .await?; - inner.connection = connection.clone(); - server.connection = connection.clone(); - - Ok(()) + pub fn get(&self, id: PlainNodeId) -> &RemoteLogServer { + self.servers.get(&id).expect("node is in nodeset") } } diff --git a/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs b/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs index cbb660df3..de313ed0f 100644 --- a/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs +++ b/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs @@ -28,6 +28,7 @@ pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_TOTAL: &str = pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_BYTES: &str = "restate.bifrost.sequencer.committed_records.bytes"; pub(crate) const BIFROST_SEQ_STORE_DURATION: &str = "restate.bifrost.sequencer.store_duration"; +pub(crate) const BIFROST_SEQ_APPEND_DURATION: &str = "restate.bifrost.sequencer.append_duration"; pub(crate) fn describe_metrics() { describe_counter!( @@ -72,6 +73,12 @@ pub(crate) fn describe_metrics() { "Size of records committed" ); + describe_histogram!( + BIFROST_SEQ_APPEND_DURATION, + Unit::Seconds, + "Append batch duration in seconds as measured by the sequencer" + ); + describe_histogram!( BIFROST_SEQ_STORE_DURATION, Unit::Seconds, diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs index b8263f2b0..32eed2d0c 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs @@ -8,36 +8,29 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::{ - cmp::Ordering, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{cmp::Ordering, fmt::Display, sync::Arc, time::Duration}; -use futures::{stream::FuturesUnordered, StreamExt}; -use tokio::{sync::OwnedSemaphorePermit, time::timeout}; -use tracing::{instrument, trace}; +use tokio::time::Instant; +use tokio::{sync::OwnedSemaphorePermit, task::JoinSet}; +use tracing::{debug, instrument, trace, warn}; use restate_core::{ cancellation_token, - network::{ - rpc_router::{RpcError, RpcRouter}, - Incoming, NetworkError, Networking, Outgoing, TransportConnect, - }, - ShutdownError, + network::{rpc_router::RpcRouter, Incoming, NetworkError, Networking, TransportConnect}, + ShutdownError, TaskCenterFutureExt, }; use restate_types::{ config::Configuration, live::Live, logs::{LogletOffset, Record, SequenceNumber, TailState}, net::log_server::{LogServerRequestHeader, Status, Store, StoreFlags, Stored}, - replicated_loglet::Spread, - replication::NodeSet, + replication::{DecoratedNodeSet, NodeSet}, time::MillisSinceEpoch, - PlainNodeId, + Merge, PlainNodeId, }; use super::{RecordsExt, SequencerSharedState}; +use crate::providers::replicated_loglet::metric_definitions::BIFROST_SEQ_APPEND_DURATION; use crate::{ loglet::{AppendError, LogletCommitResolver}, providers::replicated_loglet::{ @@ -50,8 +43,9 @@ use crate::{ }; const DEFAULT_BACKOFF_TIME: Duration = Duration::from_millis(1000); +const TONE_ESCALATION_THRESHOLD: usize = 5; -enum SequencerAppenderState { +enum State { Wave { // nodes that should be avoided by the spread selector graylist: NodeSet, @@ -69,6 +63,9 @@ pub(crate) struct SequencerAppender { networking: Networking, first_offset: LogletOffset, records: Arc<[Record]>, + checker: NodeSetChecker, + nodeset_status: DecoratedNodeSet, + current_wave: usize, // permit is held during the entire live // of the batch to limit the number of // inflight batches @@ -88,10 +85,25 @@ impl SequencerAppender { permit: OwnedSemaphorePermit, commit_resolver: LogletCommitResolver, ) -> Self { + // todo: in the future, we should update the checker's view over nodes configuration before + // each wave. At the moment this is not required as nodes will not change their storage + // state after the nodeset has been created until the loglet is sealed. + let checker = NodeSetChecker::::new( + sequencer_shared_state.selector.nodeset(), + &networking.metadata().nodes_config_ref(), + sequencer_shared_state.selector.replication_property(), + ); + + let nodeset_status = + DecoratedNodeSet::from(sequencer_shared_state.selector.nodeset().clone()); + Self { sequencer_shared_state, store_router, networking, + checker, + nodeset_status, + current_wave: 0, first_offset, records, permit: Some(permit), @@ -112,15 +124,14 @@ impl SequencerAppender { ) )] pub async fn run(mut self) { - let mut wave = 0; + let start = Instant::now(); // initial wave has 0 replicated and 0 gray listed node - let mut state = SequencerAppenderState::Wave { + let mut state = State::Wave { graylist: NodeSet::default(), }; let cancellation = cancellation_token(); - let mut cancelled = std::pin::pin!(cancellation.cancelled()); let retry_policy = self .configuration .live_load() @@ -135,36 +146,42 @@ impl SequencerAppender { let final_state = loop { state = match state { // termination conditions - SequencerAppenderState::Done - | SequencerAppenderState::Cancelled - | SequencerAppenderState::Sealed => break state, - SequencerAppenderState::Wave { graylist } => { - wave += 1; - tokio::select! { - next_state = self.wave(graylist, wave) => {next_state}, - _ = &mut cancelled => { - break SequencerAppenderState::Cancelled; - } - } + State::Done | State::Cancelled | State::Sealed => break state, + State::Wave { graylist } => { + self.current_wave += 1; + let Some(next_state) = cancellation + .run_until_cancelled(self.send_wave(graylist)) + .await + else { + break State::Cancelled; + }; + next_state } - SequencerAppenderState::Backoff => { + State::Backoff => { // since backoff can be None, or run out of iterations, // but appender should never give up we fall back to fixed backoff let delay = retry.next().unwrap_or(DEFAULT_BACKOFF_TIME); - tracing::info!( - delay = ?delay, - %wave, - "Append wave failed, retrying with a new wave after delay" - ); - - tokio::select! { - _ = tokio::time::sleep(delay) => {}, - _ = &mut cancelled => { - break SequencerAppenderState::Cancelled; - } + if self.current_wave >= TONE_ESCALATION_THRESHOLD { + warn!( + wave = %self.current_wave, + "Append wave failed, retrying with a new wave after {:?}. Status is {}", delay, self.nodeset_status + ); + } else { + debug!( + wave = %self.current_wave, + "Append wave failed, retrying with a new wave after {:?}. Status is {}", delay, self.nodeset_status + ); + } + + if cancellation + .run_until_cancelled(tokio::time::sleep(delay)) + .await + .is_none() + { + break State::Cancelled; }; - SequencerAppenderState::Wave { + State::Wave { graylist: NodeSet::default(), } } @@ -172,35 +189,42 @@ impl SequencerAppender { }; match final_state { - SequencerAppenderState::Done => { + State::Done => { assert!(self.commit_resolver.is_none()); metrics::counter!(BIFROST_SEQ_RECORDS_COMMITTED_TOTAL) .increment(self.records.len() as u64); metrics::counter!(BIFROST_SEQ_RECORDS_COMMITTED_BYTES) .increment(self.records.estimated_encode_size() as u64); + metrics::histogram!(BIFROST_SEQ_APPEND_DURATION).record(start.elapsed()); - tracing::trace!("SequencerAppender task completed"); + trace!( + wave = %self.current_wave, + "Append succeeded in {:?}, status {}", + start.elapsed(), + self.nodeset_status + ); } - SequencerAppenderState::Cancelled => { - tracing::trace!("SequencerAppender task cancelled"); + State::Cancelled => { + trace!("Append cancelled"); if let Some(commit_resolver) = self.commit_resolver.take() { commit_resolver.error(AppendError::Shutdown(ShutdownError)); } } - SequencerAppenderState::Sealed => { - tracing::debug!("SequencerAppender ended because of sealing"); + State::Sealed => { + trace!("Append ended because of sealing"); if let Some(commit_resolver) = self.commit_resolver.take() { commit_resolver.sealed(); } } - SequencerAppenderState::Backoff | SequencerAppenderState::Wave { .. } => { + State::Backoff | State::Wave { .. } => { unreachable!() } } } - async fn wave(&mut self, mut graylist: NodeSet, wave: usize) -> SequencerAppenderState { + #[instrument(skip_all, fields(wave = %self.current_wave))] + async fn send_wave(&mut self, mut graylist: NodeSet) -> State { // select the spread let spread = match self.sequencer_shared_state.selector.select( &mut rand::rng(), @@ -210,23 +234,17 @@ impl SequencerAppender { Ok(spread) => spread, Err(_) => { graylist.clear(); - return SequencerAppenderState::Backoff; + trace!( + %graylist, + "Cannot select a spread, perhaps too many nodes are graylisted, will clear the list and try again" + ); + return State::Backoff; } }; - tracing::trace!(%graylist, %spread, %wave, "Sending store wave"); - self.send_wave(spread, wave).await - } - - async fn send_wave(&mut self, spread: Spread, wave: usize) -> SequencerAppenderState { + trace!(%graylist, %spread, "Sending append wave"); let last_offset = self.records.last_offset(self.first_offset).unwrap(); - let mut checker = NodeSetChecker::::new( - self.sequencer_shared_state.selector.nodeset(), - &self.networking.metadata().nodes_config_ref(), - self.sequencer_shared_state.selector.replication_property(), - ); - // todo: should be exponential backoff let store_timeout = *self .configuration @@ -235,98 +253,114 @@ impl SequencerAppender { .replicated_loglet .log_server_rpc_timeout; - let timeout_at = MillisSinceEpoch::after(store_timeout); // track the in flight server ids let mut pending_servers = NodeSet::from_iter(spread.iter().copied()); - let mut store_tasks = FuturesUnordered::new(); - - for node_id in &spread { - store_tasks.push( - LogServerStoreTask { - node_id: *node_id, - sequencer_shared_state: &self.sequencer_shared_state, - networking: &self.networking, - first_offset: self.first_offset, - records: &self.records, - rpc_router: &self.store_router, - timeout_at, + let mut store_tasks = JoinSet::new(); + + for node_id in spread.iter().copied() { + // do not attempt on nodes that we know they're committed || sealed + if let Some(status) = self.checker.get_attribute(&node_id) { + if status.committed || status.sealed { + pending_servers.remove(node_id); + continue; } - .run(), - ); + } + store_tasks.spawn({ + let store_task = LogServerStoreTask { + node_id, + sequencer_shared_state: self.sequencer_shared_state.clone(), + networking: self.networking.clone(), + first_offset: self.first_offset, + records: self.records.clone(), + rpc_router: self.store_router.clone(), + store_timeout, + }; + async move { (node_id, store_task.run().await) }.in_current_tc() + }); } - loop { - let store_result = match timeout(store_timeout, store_tasks.next()).await { - Ok(Some(result)) => result, - Ok(None) => break, // no more tasks - Err(_) => { - // if we have already acknowledged this append, it's okay to retire. - if self.commit_resolver.is_none() { - tracing::debug!(%pending_servers, %wave, %spread, responses=?checker, "Some servers didn't store this batch, but append was committed, giving up"); - return SequencerAppenderState::Done; - } - - if self.sequencer_shared_state.known_global_tail.is_sealed() { - tracing::debug!(%pending_servers, %wave, %spread, responses=?checker, "Some servers didn't store this batch, but this loglet was sealed, giving up"); - return SequencerAppenderState::Sealed; - } - // timed out! - // none of the pending tasks has finished in time! we will assume all pending server - // are graylisted and try again - tracing::debug!(%pending_servers, %wave, %spread, responses=?checker, "Timeout waiting on store response"); - return SequencerAppenderState::Wave { - graylist: pending_servers, - }; - } + while let Some(store_result) = store_tasks.join_next().await { + // unlikely to happen, but it's there for completeness + if self.sequencer_shared_state.known_global_tail.is_sealed() { + trace!(%pending_servers, %spread, "Loglet was sealed, stopping this sequencer appender"); + return State::Sealed; + } + let Ok((node_id, store_result)) = store_result else { + // task panicked, ignore + continue; }; - let LogServerStoreTaskResult { - node_id: peer, - status, - } = store_result; - - let response = match status { + let stored = match store_result { StoreTaskStatus::Error(NetworkError::Shutdown(_)) => { - return SequencerAppenderState::Cancelled; + return State::Cancelled; + } + StoreTaskStatus::Error(NetworkError::Timeout(_)) => { + // Yes, I know those checks are ugly, but it's a quick and dirty way until we + // have a nice macro for it. + if self.current_wave >= TONE_ESCALATION_THRESHOLD { + debug!(peer = %node_id, "Timeout waiting for node {} to commit a batch", node_id); + } else { + trace!(peer = %node_id, "Timeout waiting for node {} to commit a batch", node_id); + } + self.nodeset_status.merge(node_id, PerNodeStatus::timeout()); + graylist.insert(node_id); + continue; } StoreTaskStatus::Error(err) => { // couldn't send store command to remote server - tracing::debug!(%peer, error=%err, "Failed to send batch to node"); + if self.current_wave >= TONE_ESCALATION_THRESHOLD { + debug!(peer = %node_id, %err, "Failed to send batch to node"); + } else { + trace!(peer = %node_id, %err, "Failed to send batch to node"); + } + self.nodeset_status.merge(node_id, PerNodeStatus::failed()); + graylist.insert(node_id); continue; } StoreTaskStatus::Sealed => { - tracing::debug!(%peer, "Store task cancelled, the node is sealed"); - checker.set_attribute(peer, NodeAttributes::sealed()); + debug!(peer = %node_id, "Store task cancelled, the node is sealed"); + self.checker + .set_attribute(node_id, NodeAttributes::sealed()); + self.nodeset_status.merge(node_id, PerNodeStatus::Sealed); continue; } StoreTaskStatus::Stored(stored) => { - tracing::trace!(%peer, "Store task completed"); + trace!(peer = %node_id, "Store task completed"); stored } }; - // we had a response from this node and there is still a lot we can do - match response.status { + // We had a response from this node and there is still a lot we can do + match stored.status { Status::Ok => { // only if status is okay that we remove this node // from the gray list, and move to replicated list - checker.set_attribute(peer, NodeAttributes::committed()); - pending_servers.remove(peer); + self.checker + .set_attribute(node_id, NodeAttributes::committed()); + self.nodeset_status.merge(node_id, PerNodeStatus::Committed); + pending_servers.remove(node_id); } Status::Sealed | Status::Sealing => { - checker.set_attribute(peer, NodeAttributes::sealed()); + self.checker + .set_attribute(node_id, NodeAttributes::sealed()); + graylist.insert(node_id); + } + Status::Dropped => { + // Overloaded, or request expired + debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer is load shedding"); + graylist.insert(node_id); + } + Status::Disabled => { + debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer's log-store is disabled"); + graylist.insert(node_id); } - Status::Disabled - | Status::Dropped - | Status::SequencerMismatch - | Status::Malformed - | Status::OutOfBounds => { - // just leave this log server in graylist (pending) - tracing::debug!(%peer, status=?response.status, "Store task returned an error status"); + Status::SequencerMismatch | Status::Malformed | Status::OutOfBounds => { + warn!(peer = %node_id, status=?stored.status, "Store failed on peer due to unexpected error, please check logs of the peer to investigate"); + graylist.insert(node_id); } } - if self.commit_resolver.is_some() && checker.check_write_quorum(|attr| attr.committed) { + if self.checker.check_write_quorum(|attr| attr.committed) { // resolve the commit if not resolved yet if let Some(resolver) = self.commit_resolver.take() { self.sequencer_shared_state @@ -334,20 +368,109 @@ impl SequencerAppender { .notify_offset_update(last_offset.next()); resolver.offset(last_offset); } - // drop the permit self.permit.take(); + return State::Done; } } - if checker.check_write_quorum(|attr| attr.committed) { - SequencerAppenderState::Done - } else if checker.check_fmajority(|attr| attr.sealed).passed() { - SequencerAppenderState::Sealed + if self.checker.check_fmajority(|attr| attr.sealed).passed() { + State::Sealed } else { - SequencerAppenderState::Wave { - graylist: pending_servers, + // We couldn't achieve write quorum with this wave. We will try again, as fast as + // possible until the graylist eats up enough nodes such that we won't be able to + // generate node nodesets. Only then we backoff. + State::Wave { graylist } + } + } +} + +#[derive(Default, Debug, PartialEq, Clone, Copy)] +enum PerNodeStatus { + #[default] + NotAttempted, + // todo: the distinction between timeout and failed might not be worth the hassle. + // consider only doing failed if in practice it wasn't as useful to keep both variants. + Failed { + attempts: usize, + }, + Timeout { + attempts: usize, + }, + Committed, + Sealed, +} + +impl Display for PerNodeStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PerNodeStatus::NotAttempted => write!(f, ""), + PerNodeStatus::Failed { attempts } => write!(f, "ERROR({})", attempts), + PerNodeStatus::Committed => write!(f, "COMMITTED"), + PerNodeStatus::Timeout { attempts } => write!(f, "TIMEDOUT({})", attempts), + PerNodeStatus::Sealed => write!(f, "SEALED"), + } + } +} + +impl PerNodeStatus { + fn timeout() -> Self { + Self::Timeout { attempts: 1 } + } + fn failed() -> Self { + Self::Failed { attempts: 1 } + } +} + +impl Merge for PerNodeStatus { + fn merge(&mut self, other: Self) -> bool { + use PerNodeStatus::*; + match (*self, other) { + (NotAttempted, NotAttempted) => false, + (Committed, Committed) => false, + (NotAttempted, e) => { + *self = e; + true + } + // we will not transition from committed to seal because + // committed is more important for showing where did we write. Not that this is likely + // to ever happen though. + (Committed, _) => false, + (Failed { attempts }, Failed { .. }) => { + *self = Failed { + attempts: attempts + 1, + }; + true } + (Failed { attempts }, Timeout { .. }) => { + *self = Timeout { + attempts: attempts + 1, + }; + true + } + (Timeout { attempts }, Failed { .. }) => { + *self = Failed { + attempts: attempts + 1, + }; + true + } + (Timeout { attempts }, Timeout { .. }) => { + *self = Timeout { + attempts: attempts + 1, + }; + true + } + (_, Committed) => { + *self = Committed; + true + } + (Sealed, Sealed) => false, + (_, Sealed) => { + *self = Sealed; + true + } + (Sealed, _) => false, + _ => false, } } } @@ -374,6 +497,18 @@ impl NodeAttributes { } } +impl Display for NodeAttributes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match (self.committed, self.sealed) { + // legend X = committed to be consistent with restatectl digest output + (true, true) => write!(f, "X(S)"), + (true, false) => write!(f, "X"), + (false, true) => write!(f, "-(S)"), + (false, false) => write!(f, ""), + } + } +} + #[derive(Debug)] enum StoreTaskStatus { Sealed, @@ -390,24 +525,19 @@ impl From> for StoreTaskStatus { } } -struct LogServerStoreTaskResult { - pub node_id: PlainNodeId, - pub status: StoreTaskStatus, -} - /// The task will retry to connect to the remote server if connection /// was lost. -struct LogServerStoreTask<'a, T> { +struct LogServerStoreTask { node_id: PlainNodeId, - sequencer_shared_state: &'a Arc, - networking: &'a Networking, + sequencer_shared_state: Arc, + networking: Networking, first_offset: LogletOffset, - records: &'a Arc<[Record]>, - rpc_router: &'a RpcRouter, - timeout_at: MillisSinceEpoch, + records: Arc<[Record]>, + rpc_router: RpcRouter, + store_timeout: Duration, } -impl LogServerStoreTask<'_, T> { +impl LogServerStoreTask { #[instrument( skip_all, fields( @@ -418,7 +548,7 @@ impl LogServerStoreTask<'_, T> { peer=%self.node_id, ) )] - async fn run(mut self) -> LogServerStoreTaskResult { + async fn run(mut self) -> StoreTaskStatus { let result = self.send().await; match &result { Ok(status) => { @@ -435,18 +565,14 @@ impl LogServerStoreTask<'_, T> { } } - LogServerStoreTaskResult { - node_id: self.node_id, - status: result.into(), - } + result.into() } async fn send(&mut self) -> Result { - let mut server = self + let server = self .sequencer_shared_state .log_server_manager - .get(self.node_id, self.networking) - .await?; + .get(self.node_id); let server_local_tail = server .local_tail() .wait_for_offset_or_seal(self.first_offset); @@ -481,7 +607,7 @@ impl LogServerStoreTask<'_, T> { } } - let incoming = match self.try_send(&mut server).await { + let incoming = match self.try_send(server).await { Ok(incoming) => incoming, Err(err) => { return Err(err); @@ -506,10 +632,8 @@ impl LogServerStoreTask<'_, T> { Ok(StoreTaskStatus::Stored(incoming.into_body())) } - async fn try_send( - &mut self, - server: &mut RemoteLogServer, - ) -> Result, NetworkError> { + async fn try_send(&self, server: &RemoteLogServer) -> Result, NetworkError> { + let timeout_at = MillisSinceEpoch::after(self.store_timeout); let store = Store { header: LogServerRequestHeader::new( *self.sequencer_shared_state.loglet_id(), @@ -520,53 +644,28 @@ impl LogServerStoreTask<'_, T> { first_offset: self.first_offset, flags: StoreFlags::empty(), known_archived: LogletOffset::INVALID, - payloads: Arc::clone(self.records), + payloads: Arc::clone(&self.records), sequencer: *self.sequencer_shared_state.sequencer(), - timeout_at: Some(self.timeout_at), + timeout_at: Some(timeout_at), }; - let mut msg = Outgoing::new(self.node_id, store); - - let mut attempt = 0; - loop { - attempt += 1; - - let with_connection = msg.assign_connection(server.connection().clone()); - let store_start_time = Instant::now(); - - let result = match self.rpc_router.call_on_connection(with_connection).await { - Ok(incoming) => Ok(incoming), - Err(RpcError::Shutdown(shutdown)) => Err(NetworkError::Shutdown(shutdown)), - Err(RpcError::SendError(err)) => { - msg = err.original.forget_connection(); - - match err.source { - NetworkError::ConnectionClosed(_) - | NetworkError::ConnectError(_) - | NetworkError::Timeout(_) => { - trace!( - %attempt, - "Failed to send store to log server, trying to create a new connection" - ); - self.sequencer_shared_state - .log_server_manager - .renew(server, self.networking) - .await?; - trace!( - %attempt, - "Reconnected to log-server, retrying the store" - ); - // try again - continue; - } - _ => Err(err.source), - } - } - }; - - server.store_latency().record(store_start_time.elapsed()); - - return result; + let store_start_time = Instant::now(); + + // note: we are over-indexing on the fact that currently the sequencer will send one + // message at a time per log-server. My argument to make us not sticking to a single + // connection is that the complexity with the previous design didn't add any value. When we + // support pipelined writes, it's unlikely that we'll also be doing the coordination through + // the offset watch as we are currently doing (due to its lock-contention downside). It'll be a different design altogether. + match self + .rpc_router + .call_timeout(&self.networking, self.node_id, store, self.store_timeout) + .await + { + Ok(incoming) => { + server.store_latency().record(store_start_time.elapsed()); + Ok(incoming) + } + Err(err) => Err(err), } } } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs index e0fdfed83..0292371f8 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs @@ -92,11 +92,7 @@ impl CheckSealTask { let local_tails: BTreeMap = effective_nodeset .iter() - .filter_map(|node_id| { - log_servers - .try_get_tail_offset(*node_id) - .map(|w| (*node_id, w)) - }) + .map(|node_id| (*node_id, log_servers.get_tail_offset(*node_id).clone())) .collect(); // If some of the nodes are already sealed, we know our answer and we don't need to go through