From cbabc0f06af2dc3471acddf83530c28cfd33d15e Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 5 Feb 2025 19:19:15 +0000 Subject: [PATCH] [WIP] sequencer updates --- Cargo.lock | 2 + crates/bifrost/Cargo.toml | 1 + .../src/providers/replicated_loglet/error.rs | 2 +- .../replicated_loglet/log_server_manager.rs | 108 +--- .../replicated_loglet/metric_definitions.rs | 7 + .../replicated_loglet/replication/checker.rs | 20 +- .../replicated_loglet/sequencer/appender.rs | 487 +++++++++++------- .../replicated_loglet/tasks/check_seal.rs | 6 +- .../providers/replicated_loglet/tasks/seal.rs | 21 +- crates/log-server/Cargo.toml | 1 + crates/log-server/src/loglet_worker.rs | 1 - crates/types/src/logs/metadata.rs | 11 +- crates/types/src/replication/nodeset.rs | 65 ++- .../restatectl/src/commands/cluster/config.rs | 7 + 14 files changed, 422 insertions(+), 317 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ff241cb51..b8b852d37b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6427,6 +6427,7 @@ dependencies = [ name = "restate-bifrost" version = "1.2.0-dev" dependencies = [ + "ahash 0.8.11", "anyhow", "async-trait", "bytes", @@ -6873,6 +6874,7 @@ dependencies = [ "prost", "prost-dto", "prost-types", + "rand 0.9.0", "restate-bifrost", "restate-core", "restate-metadata-server", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index a1a38766d5..4a5c248d27 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -25,6 +25,7 @@ restate-rocksdb = { workspace = true } restate-test-util = { workspace = true, optional = true } restate-types = { workspace = true } +ahash = { workspace = true } anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } diff --git a/crates/bifrost/src/providers/replicated_loglet/error.rs b/crates/bifrost/src/providers/replicated_loglet/error.rs index e266f23b8e..8467f3b7cc 100644 --- a/crates/bifrost/src/providers/replicated_loglet/error.rs +++ b/crates/bifrost/src/providers/replicated_loglet/error.rs @@ -18,7 +18,7 @@ use restate_types::replication::DecoratedNodeSet; use crate::loglet::OperationError; -#[derive(Default, derive_more::Display, derive_more::Debug)] +#[derive(Default, derive_more::Display, derive_more::Debug, PartialEq, Eq, Hash, Clone, Copy)] pub enum NodeSealStatus { #[display("E")] Error, diff --git a/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs b/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs index d8fe561b4b..3fca7cf456 100644 --- a/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs +++ b/crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs @@ -8,13 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::{collections::BTreeMap, ops::Deref, sync::Arc}; +use std::sync::Arc; -use crossbeam_utils::CachePadded; +use ahash::HashMap; use metrics::Histogram; -use tokio::sync::Mutex; -use restate_core::network::{NetworkError, Networking, TransportConnect, WeakConnection}; use restate_types::{ logs::{LogletOffset, SequenceNumber, TailState}, replication::NodeSet, @@ -24,22 +22,17 @@ use restate_types::{ use super::metric_definitions::BIFROST_SEQ_STORE_DURATION; use crate::loglet::util::TailOffsetWatch; -type LogServerLock = CachePadded>>; - /// LogServer instance #[derive(Clone)] pub struct RemoteLogServer { tail: TailOffsetWatch, - connection: WeakConnection, store_latency: Histogram, } impl RemoteLogServer { - fn new(connection: WeakConnection) -> Self { - let node_id = connection.peer().as_plain(); + fn new(node_id: PlainNodeId) -> Self { Self { tail: TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST)), - connection, store_latency: metrics::histogram!(BIFROST_SEQ_STORE_DURATION, "node_id" => node_id.to_string()), } } @@ -51,20 +44,13 @@ impl RemoteLogServer { pub fn store_latency(&self) -> &Histogram { &self.store_latency } - - pub fn connection(&self) -> &WeakConnection { - &self.connection - } } /// LogServerManager maintains a set of [`RemoteLogServer`]s that provided via the /// [`NodeSet`]. -/// -/// The manager makes sure there is only one active connection per server. -/// It's up to the user of the client to do [`LogServerManager::renew`] if needed #[derive(Clone)] pub struct RemoteLogServerManager { - servers: Arc>, + servers: Arc>, } impl RemoteLogServerManager { @@ -72,94 +58,20 @@ impl RemoteLogServerManager { pub fn new(nodeset: &NodeSet) -> Self { let servers = nodeset .iter() - .map(|node_id| (*node_id, LogServerLock::default())) + .copied() + .map(|node_id| (node_id, RemoteLogServer::new(node_id))) .collect(); let servers = Arc::new(servers); Self { servers } } - pub fn try_get_tail_offset(&self, id: PlainNodeId) -> Option { + pub fn get_tail_offset(&self, id: PlainNodeId) -> &TailOffsetWatch { let server = self.servers.get(&id).expect("node is in nodeset"); - - if let Ok(guard) = server.try_lock() { - if let Some(current) = guard.deref() { - return Some(current.local_tail().clone()); - } - } - - None + server.local_tail() } - /// Gets a log-server instance. On first time it will initialize a new connection - /// to log server. It will make sure all following get call holds the same - /// connection. - /// - /// It's up to the client to call [`Self::renew`] if the connection it holds - /// is closed. - pub async fn get( - &self, - id: PlainNodeId, - networking: &Networking, - ) -> Result { - let server = self.servers.get(&id).expect("node is in nodeset"); - - let mut guard = server.lock().await; - - if let Some(current) = guard.deref() { - return Ok(current.clone()); - } - - let connection = networking.node_connection(id).await?; - let server = RemoteLogServer::new(connection); - - *guard = Some(server.clone()); - - Ok(server) - } - - /// Renew makes sure server connection is renewed if and only if - /// the provided server holds an outdated connection. Otherwise - /// the latest connection associated with this server is used. - /// - /// It's up the holder of the log server instance to retry to renew - /// if that connection is not valid. - /// - /// It also guarantees that concurrent calls to renew on the same server instance - /// will only renew the connection once for all callers - /// - /// However, this does not affect copies of LogServer that have been already retrieved - /// by calling [`Self::get()`]. - /// - /// Holder of old instances will have to call renew if they need to. - pub async fn renew( - &self, - server: &mut RemoteLogServer, - networking: &Networking, - ) -> Result<(), NetworkError> { - // this key must already be in the map - let current = self - .servers - .get(&server.connection.peer().as_plain()) - .expect("node is in nodeset"); - - let mut guard = current.lock().await; - - // if you calling renew then the LogServer has already been initialized - let inner = guard.as_mut().expect("initialized log server instance"); - - if inner.connection != server.connection { - // someone else has already renewed the connection - server.connection = inner.connection.clone(); - return Ok(()); - } - - let connection = networking - .node_connection(server.connection.peer().as_plain()) - .await?; - inner.connection = connection.clone(); - server.connection = connection.clone(); - - Ok(()) + pub fn get(&self, id: PlainNodeId) -> &RemoteLogServer { + self.servers.get(&id).expect("node is in nodeset") } } diff --git a/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs b/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs index ab86263bfa..7a3613ea9f 100644 --- a/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs +++ b/crates/bifrost/src/providers/replicated_loglet/metric_definitions.rs @@ -29,6 +29,7 @@ pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_TOTAL: &str = pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_BYTES: &str = "restate.bifrost.sequencer.committed_records.bytes"; pub(crate) const BIFROST_SEQ_STORE_DURATION: &str = "restate.bifrost.sequencer.store_duration"; +pub(crate) const BIFROST_SEQ_APPEND_DURATION: &str = "restate.bifrost.sequencer.append_duration"; pub(crate) fn describe_metrics() { describe_counter!( @@ -79,6 +80,12 @@ pub(crate) fn describe_metrics() { "Size of records committed" ); + describe_histogram!( + BIFROST_SEQ_APPEND_DURATION, + Unit::Seconds, + "Append batch duration in seconds as measured by the sequencer" + ); + describe_histogram!( BIFROST_SEQ_STORE_DURATION, Unit::Seconds, diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs index 3c389112a4..e2170d66dc 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs @@ -8,12 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::BTreeMap; use std::fmt::Debug; use std::fmt::Display; use std::hash::{Hash, Hasher}; +use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use itertools::Itertools; +use restate_types::replication::DecoratedNodeSet; use tracing::warn; use restate_types::locality::{LocationScope, NodeLocation}; @@ -734,6 +736,22 @@ impl LocationScopeState { } } +impl<'a, Attr> IntoIterator for &'a NodeSetChecker { + type Item = (&'a PlainNodeId, &'a Attr); + + type IntoIter = <&'a HashMap as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.node_to_attr.iter() + } +} + +impl From> for DecoratedNodeSet { + fn from(val: NodeSetChecker) -> DecoratedNodeSet { + DecoratedNodeSet::from_iter(val.node_to_attr) + } +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs index b8263f2b00..a27ca26afd 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs @@ -8,36 +8,29 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::{ - cmp::Ordering, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{cmp::Ordering, fmt::Display, sync::Arc, time::Duration}; -use futures::{stream::FuturesUnordered, StreamExt}; -use tokio::{sync::OwnedSemaphorePermit, time::timeout}; -use tracing::{instrument, trace}; +use tokio::time::Instant; +use tokio::{sync::OwnedSemaphorePermit, task::JoinSet}; +use tracing::{debug, info, instrument, trace, warn}; use restate_core::{ cancellation_token, - network::{ - rpc_router::{RpcError, RpcRouter}, - Incoming, NetworkError, Networking, Outgoing, TransportConnect, - }, - ShutdownError, + network::{rpc_router::RpcRouter, Incoming, NetworkError, Networking, TransportConnect}, + ShutdownError, TaskCenterFutureExt, }; use restate_types::{ config::Configuration, live::Live, logs::{LogletOffset, Record, SequenceNumber, TailState}, net::log_server::{LogServerRequestHeader, Status, Store, StoreFlags, Stored}, - replicated_loglet::Spread, - replication::NodeSet, + replication::{DecoratedNodeSet, NodeSet}, time::MillisSinceEpoch, - PlainNodeId, + Merge, PlainNodeId, }; use super::{RecordsExt, SequencerSharedState}; +use crate::providers::replicated_loglet::metric_definitions::BIFROST_SEQ_APPEND_DURATION; use crate::{ loglet::{AppendError, LogletCommitResolver}, providers::replicated_loglet::{ @@ -51,7 +44,7 @@ use crate::{ const DEFAULT_BACKOFF_TIME: Duration = Duration::from_millis(1000); -enum SequencerAppenderState { +enum State { Wave { // nodes that should be avoided by the spread selector graylist: NodeSet, @@ -69,6 +62,9 @@ pub(crate) struct SequencerAppender { networking: Networking, first_offset: LogletOffset, records: Arc<[Record]>, + checker: NodeSetChecker, + nodeset_status: DecoratedNodeSet, + current_wave: usize, // permit is held during the entire live // of the batch to limit the number of // inflight batches @@ -88,10 +84,25 @@ impl SequencerAppender { permit: OwnedSemaphorePermit, commit_resolver: LogletCommitResolver, ) -> Self { + // todo: in the future, we should update the checker's view over nodes configuration before + // each wave. At the moment this is not required as nodes will not change their storage + // state after the nodeset has been created until the loglet is sealed. + let checker = NodeSetChecker::::new( + sequencer_shared_state.selector.nodeset(), + &networking.metadata().nodes_config_ref(), + sequencer_shared_state.selector.replication_property(), + ); + + let nodeset_status = + DecoratedNodeSet::from(sequencer_shared_state.selector.nodeset().clone()); + Self { sequencer_shared_state, store_router, networking, + checker, + nodeset_status, + current_wave: 0, first_offset, records, permit: Some(permit), @@ -112,15 +123,14 @@ impl SequencerAppender { ) )] pub async fn run(mut self) { - let mut wave = 0; + let start = Instant::now(); // initial wave has 0 replicated and 0 gray listed node - let mut state = SequencerAppenderState::Wave { + let mut state = State::Wave { graylist: NodeSet::default(), }; let cancellation = cancellation_token(); - let mut cancelled = std::pin::pin!(cancellation.cancelled()); let retry_policy = self .configuration .live_load() @@ -135,36 +145,42 @@ impl SequencerAppender { let final_state = loop { state = match state { // termination conditions - SequencerAppenderState::Done - | SequencerAppenderState::Cancelled - | SequencerAppenderState::Sealed => break state, - SequencerAppenderState::Wave { graylist } => { - wave += 1; - tokio::select! { - next_state = self.wave(graylist, wave) => {next_state}, - _ = &mut cancelled => { - break SequencerAppenderState::Cancelled; - } - } + State::Done | State::Cancelled | State::Sealed => break state, + State::Wave { graylist } => { + self.current_wave += 1; + let Some(next_state) = cancellation + .run_until_cancelled(self.send_wave(graylist)) + .await + else { + break State::Cancelled; + }; + next_state } - SequencerAppenderState::Backoff => { + State::Backoff => { // since backoff can be None, or run out of iterations, // but appender should never give up we fall back to fixed backoff let delay = retry.next().unwrap_or(DEFAULT_BACKOFF_TIME); - tracing::info!( - delay = ?delay, - %wave, - "Append wave failed, retrying with a new wave after delay" - ); - - tokio::select! { - _ = tokio::time::sleep(delay) => {}, - _ = &mut cancelled => { - break SequencerAppenderState::Cancelled; - } + if self.current_wave > 5 { + warn!( + wave = %self.current_wave, + "Append wave failed, retrying with a new wave after {:?}. Status is {}", delay, self.nodeset_status + ); + } else { + debug!( + wave = %self.current_wave, + "Append wave failed, retrying with a new wave after {:?}. Status is {}", delay, self.nodeset_status + ); + } + + if cancellation + .run_until_cancelled(tokio::time::sleep(delay)) + .await + .is_none() + { + break State::Cancelled; }; - SequencerAppenderState::Wave { + State::Wave { graylist: NodeSet::default(), } } @@ -172,35 +188,42 @@ impl SequencerAppender { }; match final_state { - SequencerAppenderState::Done => { + State::Done => { assert!(self.commit_resolver.is_none()); metrics::counter!(BIFROST_SEQ_RECORDS_COMMITTED_TOTAL) .increment(self.records.len() as u64); metrics::counter!(BIFROST_SEQ_RECORDS_COMMITTED_BYTES) .increment(self.records.estimated_encode_size() as u64); + metrics::histogram!(BIFROST_SEQ_APPEND_DURATION).record(start.elapsed()); - tracing::trace!("SequencerAppender task completed"); + trace!( + wave = %self.current_wave, + "Append succeeded in {:?}, status {}", + start.elapsed(), + self.nodeset_status + ); } - SequencerAppenderState::Cancelled => { - tracing::trace!("SequencerAppender task cancelled"); + State::Cancelled => { + trace!("Append cancelled"); if let Some(commit_resolver) = self.commit_resolver.take() { commit_resolver.error(AppendError::Shutdown(ShutdownError)); } } - SequencerAppenderState::Sealed => { - tracing::debug!("SequencerAppender ended because of sealing"); + State::Sealed => { + trace!("Append ended because of sealing"); if let Some(commit_resolver) = self.commit_resolver.take() { commit_resolver.sealed(); } } - SequencerAppenderState::Backoff | SequencerAppenderState::Wave { .. } => { + State::Backoff | State::Wave { .. } => { unreachable!() } } } - async fn wave(&mut self, mut graylist: NodeSet, wave: usize) -> SequencerAppenderState { + #[instrument(skip_all, fields(wave = %self.current_wave))] + async fn send_wave(&mut self, mut graylist: NodeSet) -> State { // select the spread let spread = match self.sequencer_shared_state.selector.select( &mut rand::rng(), @@ -210,23 +233,17 @@ impl SequencerAppender { Ok(spread) => spread, Err(_) => { graylist.clear(); - return SequencerAppenderState::Backoff; + trace!( + %graylist, + "Cannot select a spread, perhaps too many nodes are graylisted, will clear the list and try again" + ); + return State::Backoff; } }; - tracing::trace!(%graylist, %spread, %wave, "Sending store wave"); - self.send_wave(spread, wave).await - } - - async fn send_wave(&mut self, spread: Spread, wave: usize) -> SequencerAppenderState { + trace!(%graylist, %spread, "Sending append wave"); let last_offset = self.records.last_offset(self.first_offset).unwrap(); - let mut checker = NodeSetChecker::::new( - self.sequencer_shared_state.selector.nodeset(), - &self.networking.metadata().nodes_config_ref(), - self.sequencer_shared_state.selector.replication_property(), - ); - // todo: should be exponential backoff let store_timeout = *self .configuration @@ -235,98 +252,106 @@ impl SequencerAppender { .replicated_loglet .log_server_rpc_timeout; - let timeout_at = MillisSinceEpoch::after(store_timeout); // track the in flight server ids let mut pending_servers = NodeSet::from_iter(spread.iter().copied()); - let mut store_tasks = FuturesUnordered::new(); - - for node_id in &spread { - store_tasks.push( - LogServerStoreTask { - node_id: *node_id, - sequencer_shared_state: &self.sequencer_shared_state, - networking: &self.networking, - first_offset: self.first_offset, - records: &self.records, - rpc_router: &self.store_router, - timeout_at, + let mut store_tasks = JoinSet::new(); + + for node_id in spread.iter().copied() { + // do not attempt on nodes that we know they're committed || sealed + if let Some(status) = self.checker.get_attribute(&node_id) { + if status.committed || status.sealed { + pending_servers.remove(node_id); + continue; } - .run(), - ); + } + store_tasks.spawn({ + let store_task = LogServerStoreTask { + node_id, + sequencer_shared_state: self.sequencer_shared_state.clone(), + networking: self.networking.clone(), + first_offset: self.first_offset, + records: self.records.clone(), + rpc_router: self.store_router.clone(), + store_timeout, + }; + async move { (node_id, store_task.run().await) }.in_current_tc() + }); } - loop { - let store_result = match timeout(store_timeout, store_tasks.next()).await { - Ok(Some(result)) => result, - Ok(None) => break, // no more tasks - Err(_) => { - // if we have already acknowledged this append, it's okay to retire. - if self.commit_resolver.is_none() { - tracing::debug!(%pending_servers, %wave, %spread, responses=?checker, "Some servers didn't store this batch, but append was committed, giving up"); - return SequencerAppenderState::Done; - } - - if self.sequencer_shared_state.known_global_tail.is_sealed() { - tracing::debug!(%pending_servers, %wave, %spread, responses=?checker, "Some servers didn't store this batch, but this loglet was sealed, giving up"); - return SequencerAppenderState::Sealed; - } - // timed out! - // none of the pending tasks has finished in time! we will assume all pending server - // are graylisted and try again - tracing::debug!(%pending_servers, %wave, %spread, responses=?checker, "Timeout waiting on store response"); - return SequencerAppenderState::Wave { - graylist: pending_servers, - }; - } + while let Some(store_result) = store_tasks.join_next().await { + // unlikely to happen, but it's there for completeness + if self.sequencer_shared_state.known_global_tail.is_sealed() { + trace!(%pending_servers, %spread, "Loglet was sealed, stopping this sequencer appender"); + return State::Sealed; + } + let Ok((node_id, store_result)) = store_result else { + // task panicked, ignore + continue; }; - let LogServerStoreTaskResult { - node_id: peer, - status, - } = store_result; - - let response = match status { + let stored = match store_result { StoreTaskStatus::Error(NetworkError::Shutdown(_)) => { - return SequencerAppenderState::Cancelled; + return State::Cancelled; + } + StoreTaskStatus::Error(NetworkError::Timeout(_)) => { + trace!(peer = %node_id, "Timeout waiting for node {} to commit a batch", node_id); + self.nodeset_status.merge(node_id, PerNodeStatus::timeout()); + graylist.insert(node_id); + continue; } StoreTaskStatus::Error(err) => { // couldn't send store command to remote server - tracing::debug!(%peer, error=%err, "Failed to send batch to node"); + // todo: only log if number of attempts is high. + debug!(peer = %node_id, %err, "Failed to send batch to node"); + self.nodeset_status.merge(node_id, PerNodeStatus::failed()); + graylist.insert(node_id); continue; } StoreTaskStatus::Sealed => { - tracing::debug!(%peer, "Store task cancelled, the node is sealed"); - checker.set_attribute(peer, NodeAttributes::sealed()); + debug!(peer = %node_id, "Store task cancelled, the node is sealed"); + self.checker + .set_attribute(node_id, NodeAttributes::sealed()); + self.nodeset_status.merge(node_id, PerNodeStatus::Sealed); continue; } StoreTaskStatus::Stored(stored) => { - tracing::trace!(%peer, "Store task completed"); + trace!(peer = %node_id, "Store task completed"); stored } }; // we had a response from this node and there is still a lot we can do - match response.status { + match stored.status { Status::Ok => { // only if status is okay that we remove this node // from the gray list, and move to replicated list - checker.set_attribute(peer, NodeAttributes::committed()); - pending_servers.remove(peer); + self.checker + .set_attribute(node_id, NodeAttributes::committed()); + self.nodeset_status.merge(node_id, PerNodeStatus::Committed); + pending_servers.remove(node_id); } Status::Sealed | Status::Sealing => { - checker.set_attribute(peer, NodeAttributes::sealed()); + self.checker + .set_attribute(node_id, NodeAttributes::sealed()); + graylist.insert(node_id); + } + Status::Dropped => { + // overloaded, or request expired + debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer is load shedding"); + graylist.insert(node_id); } - Status::Disabled - | Status::Dropped - | Status::SequencerMismatch - | Status::Malformed - | Status::OutOfBounds => { - // just leave this log server in graylist (pending) - tracing::debug!(%peer, status=?response.status, "Store task returned an error status"); + Status::Disabled => { + // overloaded, or request expired + debug!(peer = %node_id, status=?stored.status, "Store failed on peer. Peer's log-store is disabled"); + graylist.insert(node_id); + } + Status::SequencerMismatch | Status::Malformed | Status::OutOfBounds => { + warn!(peer = %node_id, status=?stored.status, "Store failed on peer due to unexpected error, please check logs of the peer to investigate"); + graylist.insert(node_id); } } - if self.commit_resolver.is_some() && checker.check_write_quorum(|attr| attr.committed) { + if self.checker.check_write_quorum(|attr| attr.committed) { // resolve the commit if not resolved yet if let Some(resolver) = self.commit_resolver.take() { self.sequencer_shared_state @@ -334,20 +359,107 @@ impl SequencerAppender { .notify_offset_update(last_offset.next()); resolver.offset(last_offset); } - // drop the permit self.permit.take(); + return State::Done; } } - if checker.check_write_quorum(|attr| attr.committed) { - SequencerAppenderState::Done - } else if checker.check_fmajority(|attr| attr.sealed).passed() { - SequencerAppenderState::Sealed + if self.checker.check_fmajority(|attr| attr.sealed).passed() { + State::Sealed } else { - SequencerAppenderState::Wave { - graylist: pending_servers, + // We couldn't achieve write quorum with this wave. We will try again, as fast as + // possible until the graylist eats up enough nodes such that we won't be able to + // generate node nodesets. Only then we backoff. + State::Wave { graylist } + } + } +} + +#[derive(Default, Debug, PartialEq, Clone, Copy)] +enum PerNodeStatus { + #[default] + NotAttempted, + Failed { + attempts: usize, + }, + Timeout { + attempts: usize, + }, + Committed, + Sealed, +} + +impl Display for PerNodeStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PerNodeStatus::NotAttempted => write!(f, ""), + PerNodeStatus::Failed { attempts } => write!(f, "E({})", attempts), + PerNodeStatus::Committed => write!(f, "X"), + PerNodeStatus::Timeout { attempts } => write!(f, "TIMEDOUT({})", attempts), + PerNodeStatus::Sealed => write!(f, "SEALED"), + } + } +} + +impl PerNodeStatus { + fn timeout() -> Self { + Self::Timeout { attempts: 1 } + } + fn failed() -> Self { + Self::Failed { attempts: 1 } + } +} + +impl Merge for PerNodeStatus { + fn merge(&mut self, other: Self) -> bool { + use PerNodeStatus::*; + match (*self, other) { + (NotAttempted, NotAttempted) => false, + (Committed, Committed) => false, + (NotAttempted, e) => { + *self = e; + true + } + // we will not transition from committed to seal because + // committed is more important for showing where did we write. Not that this is likely + // to ever happen though. + (Committed, _) => false, + (Failed { attempts }, Failed { .. }) => { + *self = Failed { + attempts: attempts + 1, + }; + true + } + (Failed { attempts }, Timeout { .. }) => { + *self = Timeout { + attempts: attempts + 1, + }; + true + } + (Timeout { attempts }, Failed { .. }) => { + *self = Failed { + attempts: attempts + 1, + }; + true } + (Timeout { attempts }, Timeout { .. }) => { + *self = Timeout { + attempts: attempts + 1, + }; + true + } + (_, Committed) => { + *self = Committed; + true + } + (Sealed, Sealed) => false, + (_, Sealed) => { + *self = Sealed; + true + } + (Sealed, _) => false, + _ => false, } } } @@ -374,6 +486,18 @@ impl NodeAttributes { } } +impl Display for NodeAttributes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match (self.committed, self.sealed) { + // legend X = committed to be consistent with restatectl digest output + (true, true) => write!(f, "X(S)"), + (true, false) => write!(f, "X"), + (false, true) => write!(f, "-(S)"), + (false, false) => write!(f, ""), + } + } +} + #[derive(Debug)] enum StoreTaskStatus { Sealed, @@ -390,24 +514,19 @@ impl From> for StoreTaskStatus { } } -struct LogServerStoreTaskResult { - pub node_id: PlainNodeId, - pub status: StoreTaskStatus, -} - /// The task will retry to connect to the remote server if connection /// was lost. -struct LogServerStoreTask<'a, T> { +struct LogServerStoreTask { node_id: PlainNodeId, - sequencer_shared_state: &'a Arc, - networking: &'a Networking, + sequencer_shared_state: Arc, + networking: Networking, first_offset: LogletOffset, - records: &'a Arc<[Record]>, - rpc_router: &'a RpcRouter, - timeout_at: MillisSinceEpoch, + records: Arc<[Record]>, + rpc_router: RpcRouter, + store_timeout: Duration, } -impl LogServerStoreTask<'_, T> { +impl LogServerStoreTask { #[instrument( skip_all, fields( @@ -418,7 +537,7 @@ impl LogServerStoreTask<'_, T> { peer=%self.node_id, ) )] - async fn run(mut self) -> LogServerStoreTaskResult { + async fn run(mut self) -> StoreTaskStatus { let result = self.send().await; match &result { Ok(status) => { @@ -435,18 +554,14 @@ impl LogServerStoreTask<'_, T> { } } - LogServerStoreTaskResult { - node_id: self.node_id, - status: result.into(), - } + result.into() } async fn send(&mut self) -> Result { - let mut server = self + let server = self .sequencer_shared_state .log_server_manager - .get(self.node_id, self.networking) - .await?; + .get(self.node_id); let server_local_tail = server .local_tail() .wait_for_offset_or_seal(self.first_offset); @@ -481,7 +596,7 @@ impl LogServerStoreTask<'_, T> { } } - let incoming = match self.try_send(&mut server).await { + let incoming = match self.try_send(server).await { Ok(incoming) => incoming, Err(err) => { return Err(err); @@ -506,10 +621,11 @@ impl LogServerStoreTask<'_, T> { Ok(StoreTaskStatus::Stored(incoming.into_body())) } - async fn try_send( - &mut self, - server: &mut RemoteLogServer, - ) -> Result, NetworkError> { + async fn try_send(&self, server: &RemoteLogServer) -> Result, NetworkError> { + // let the log-server keep the store message a little longer than our own timeout, since + // our timeout includes the connection time, the server should still try and store it even + // if we timeout locally. + let timeout_at = MillisSinceEpoch::after(self.store_timeout * 2); let store = Store { header: LogServerRequestHeader::new( *self.sequencer_shared_state.loglet_id(), @@ -520,53 +636,24 @@ impl LogServerStoreTask<'_, T> { first_offset: self.first_offset, flags: StoreFlags::empty(), known_archived: LogletOffset::INVALID, - payloads: Arc::clone(self.records), + payloads: Arc::clone(&self.records), sequencer: *self.sequencer_shared_state.sequencer(), - timeout_at: Some(self.timeout_at), + timeout_at: Some(timeout_at), }; - let mut msg = Outgoing::new(self.node_id, store); - - let mut attempt = 0; - loop { - attempt += 1; - - let with_connection = msg.assign_connection(server.connection().clone()); - let store_start_time = Instant::now(); - - let result = match self.rpc_router.call_on_connection(with_connection).await { - Ok(incoming) => Ok(incoming), - Err(RpcError::Shutdown(shutdown)) => Err(NetworkError::Shutdown(shutdown)), - Err(RpcError::SendError(err)) => { - msg = err.original.forget_connection(); - - match err.source { - NetworkError::ConnectionClosed(_) - | NetworkError::ConnectError(_) - | NetworkError::Timeout(_) => { - trace!( - %attempt, - "Failed to send store to log server, trying to create a new connection" - ); - self.sequencer_shared_state - .log_server_manager - .renew(server, self.networking) - .await?; - trace!( - %attempt, - "Reconnected to log-server, retrying the store" - ); - // try again - continue; - } - _ => Err(err.source), - } - } - }; + let store_start_time = Instant::now(); - server.store_latency().record(store_start_time.elapsed()); - - return result; + match self + .rpc_router + .call_timeout(&self.networking, self.node_id, store, self.store_timeout) + .await + { + Ok(incoming) => { + server.store_latency().record(store_start_time.elapsed()); + Ok(incoming) + } + Err(NetworkError::Shutdown(shutdown)) => Err(NetworkError::Shutdown(shutdown)), + Err(err) => Err(err), } } } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs index e0fdfed834..0292371f8e 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs @@ -92,11 +92,7 @@ impl CheckSealTask { let local_tails: BTreeMap = effective_nodeset .iter() - .filter_map(|node_id| { - log_servers - .try_get_tail_offset(*node_id) - .map(|w| (*node_id, w)) - }) + .map(|node_id| (*node_id, log_servers.get_tail_offset(*node_id).clone())) .collect(); // If some of the nodes are already sealed, we know our answer and we don't need to go through diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs index 3686745a1b..c7e5fe7949 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs @@ -73,7 +73,7 @@ impl SealTask { .nodeset .to_effective(&networking.metadata().nodes_config_ref()); - let mut nodeset_checker = NodeSetChecker::::new( + let mut nodeset_checker = NodeSetChecker::::new( &effective_nodeset, &networking.metadata().nodes_config_ref(), &my_params.replication, @@ -116,8 +116,6 @@ impl SealTask { }); } - let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset.clone()); - // Max observed local-tail from sealed nodes let mut max_local_tail = LogletOffset::OLDEST; while let Some(response) = inflight_requests.join_next().await { @@ -127,20 +125,23 @@ impl SealTask { }; let Ok(response) = response else { // Seal failed/aborted on this node. - nodeset_status.insert(node_id, NodeSealStatus::Error); + nodeset_checker.set_attribute(node_id, NodeSealStatus::Error); continue; }; max_local_tail = max_local_tail.max(response.header.local_tail); known_global_tail.notify_offset_update(response.header.known_global_tail); - nodeset_checker.set_attribute(node_id, true); - nodeset_status.insert(node_id, NodeSealStatus::Sealed); - - if nodeset_checker.check_fmajority(|attr| *attr).passed() { + nodeset_checker.set_attribute(node_id, NodeSealStatus::Sealed); + + if nodeset_checker + .check_fmajority(|attr| *attr == NodeSealStatus::Sealed) + .passed() + { + let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset); + nodeset_status.extend(&nodeset_checker); info!( loglet_id = %my_params.loglet_id, replication = %my_params.replication, - %effective_nodeset, %max_local_tail, global_tail = %known_global_tail.latest_offset(), "Seal task completed on f-majority of nodes in {:?}. Nodeset status {}", @@ -152,6 +153,8 @@ impl SealTask { } } + let mut nodeset_status = DecoratedNodeSet::from_iter(effective_nodeset); + nodeset_status.extend(&nodeset_checker); // no more tasks left. This means that we failed to seal Err(ReplicatedLogletError::SealFailed(nodeset_status)) } diff --git a/crates/log-server/Cargo.toml b/crates/log-server/Cargo.toml index 067f6428c5..77a3c24cc2 100644 --- a/crates/log-server/Cargo.toml +++ b/crates/log-server/Cargo.toml @@ -48,6 +48,7 @@ tokio-util = { workspace = true } tonic = { workspace = true, features = ["transport", "codegen", "prost", "gzip"] } tracing = { workspace = true } xxhash-rust = { workspace = true, features = ["xxh3"] } +rand = { workspace = true } [build-dependencies] tonic-build = { workspace = true } diff --git a/crates/log-server/src/loglet_worker.rs b/crates/log-server/src/loglet_worker.rs index c58669e8e8..01cc2b6cd2 100644 --- a/crates/log-server/src/loglet_worker.rs +++ b/crates/log-server/src/loglet_worker.rs @@ -316,7 +316,6 @@ impl LogletWorker { if !body.flags.contains(StoreFlags::IgnoreSeal) && self.loglet_state.is_sealed() { return (Status::Sealed, None); } - // even if ignore-seal is set, we must wait for the in-flight seal before we can accept // writes. if *sealing_in_progress { diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 01c21e846c..009619b773 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -248,9 +248,18 @@ pub struct ReplicatedLogletConfig { /// New type that enforces that the nodeset size is never larger than 128. #[derive( - Debug, Clone, Copy, derive_more::Into, serde::Serialize, serde::Deserialize, Eq, PartialEq, + Debug, + Clone, + Copy, + derive_more::Into, + serde::Serialize, + serde::Deserialize, + Eq, + PartialEq, + derive_more::Display, )] #[serde(try_from = "u16", into = "u16")] +#[display("{_0}")] pub struct NodeSetSize(u16); impl NodeSetSize { diff --git a/crates/types/src/replication/nodeset.rs b/crates/types/src/replication/nodeset.rs index 9446dd2127..c203d6b31b 100644 --- a/crates/types/src/replication/nodeset.rs +++ b/crates/types/src/replication/nodeset.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::fmt::Display; use std::hash::{BuildHasherDefault, Hash, Hasher}; @@ -18,7 +19,7 @@ use itertools::Itertools; use rand::distr::Uniform; use rand::prelude::*; -use crate::PlainNodeId; +use crate::{Merge, PlainNodeId}; // Why? Over 50% faster in iteration than HashSet and ~40% faster than default RandomState for // contains() and set intersection operations. Additionally, it's 300% faster when created from @@ -302,12 +303,51 @@ impl From for DecoratedNodeSet { } } +impl DecoratedNodeSet { + pub fn merge(&mut self, node_id: impl Into, other: V) -> bool + where + V: Merge, + { + let node_id = node_id.into(); + match self.0.entry(node_id) { + Entry::Vacant(vacant_entry) => { + vacant_entry.insert(other); + true + } + Entry::Occupied(occupied_entry) => occupied_entry.into_mut().merge(other), + } + } + + pub fn merge_with_iter<'a>( + &mut self, + values: impl IntoIterator, + ) where + V: Merge + Clone, + V: 'a, + { + values.into_iter().for_each(|(node_id, v)| { + self.0 + .entry(*node_id) + .and_modify(|existing| { + existing.merge(v.clone()); + }) + .or_insert(v.clone()); + }); + } +} + impl FromIterator for DecoratedNodeSet { fn from_iter>(iter: T) -> Self { Self(iter.into_iter().map(|n| (n, Default::default())).collect()) } } +impl FromIterator<(PlainNodeId, V)> for DecoratedNodeSet { + fn from_iter>(iter: T) -> Self { + Self(iter.into_iter().collect()) + } +} + impl Display for DecoratedNodeSet { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "[")?; @@ -358,6 +398,8 @@ fn write_nodes_sorted(nodeset: &NodeSet, f: &mut std::fmt::Formatter<'_>) -> std #[cfg(test)] mod test { + use ahash::HashMap; + use super::*; #[test] @@ -450,4 +492,25 @@ mod test { nodeset1.insert(PlainNodeId::from(2), Status::Sealed); assert_eq!(format!("{nodeset1}"), "[N1(E), N2(S), N3(E), N4(E), N5(S)]"); } + + #[test] + fn decorated_nodeset_extend() { + #[derive(derive_more::Display, Default, Clone, Copy)] + enum Status { + #[display("S")] + Sealed, + #[default] + #[display("E")] + Error, + } + let mut nodeset1 = DecoratedNodeSet::::from(NodeSet::from_iter([2, 3, 1, 4, 5])); + nodeset1.insert(PlainNodeId::from(5), Status::Sealed); + assert_eq!(format!("{nodeset1}"), "[N1(E), N2(E), N3(E), N4(E), N5(S)]"); + let mut status_map = HashMap::default(); + status_map.insert(PlainNodeId::from(1), Status::Sealed); + status_map.insert(PlainNodeId::from(4), Status::Sealed); + nodeset1.extend(&status_map); + + assert_eq!(format!("{nodeset1}"), "[N1(S), N2(E), N3(E), N4(S), N5(S)]"); + } } diff --git a/tools/restatectl/src/commands/cluster/config.rs b/tools/restatectl/src/commands/cluster/config.rs index dcf0212314..08533914ea 100644 --- a/tools/restatectl/src/commands/cluster/config.rs +++ b/tools/restatectl/src/commands/cluster/config.rs @@ -80,6 +80,13 @@ fn write_default_provider( "Replication property", config.replication_property.to_string(), )?; + write_leaf( + w, + depth, + true, + "Nodeset size", + config.target_nodeset_size.to_string(), + )?; } } Ok(())