Skip to content

Commit

Permalink
[Bifrost] Sequencer updates
Browse files Browse the repository at this point in the history
Key changes:
- Using JoinSet instead of unordered futures to perform store tasks to avoid some of its tricky lockup scenarios
- Store tasks are free to use whatever connection is available, this over-indexes over the current design that serializes all writes in a one-at-a-time fashion, but removes a lock, and simplifies the logic.
- Extra in-flight stores are aborted after an append is complete to avoid overloading
- Timeout store tasks only on the basis of their network operation and not on the time they're waiting for the tail to move
- Cache nodeset checker across waves
- Better logging on failures, it reports the nodeset status with the number of attempts per node
- Adds a new metric to latency of appends

Performance testing shows no regression and reliability testing appears to show no regression even in tricky failure scenarios.

```
// intentionally empty
```
  • Loading branch information
AhmedSoliman committed Feb 6, 2025
1 parent adad40e commit e6423d8
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 305 deletions.
108 changes: 10 additions & 98 deletions crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::{collections::BTreeMap, ops::Deref, sync::Arc};
use std::sync::Arc;

use crossbeam_utils::CachePadded;
use ahash::HashMap;
use metrics::Histogram;
use tokio::sync::Mutex;

use restate_core::network::{NetworkError, Networking, TransportConnect, WeakConnection};
use restate_types::{
logs::{LogletOffset, SequenceNumber, TailState},
replication::NodeSet,
Expand All @@ -24,22 +22,17 @@ use restate_types::{
use super::metric_definitions::BIFROST_SEQ_STORE_DURATION;
use crate::loglet::util::TailOffsetWatch;

type LogServerLock = CachePadded<Mutex<Option<RemoteLogServer>>>;

/// LogServer instance
#[derive(Clone)]
pub struct RemoteLogServer {
tail: TailOffsetWatch,
connection: WeakConnection,
store_latency: Histogram,
}

impl RemoteLogServer {
fn new(connection: WeakConnection) -> Self {
let node_id = connection.peer().as_plain();
fn new(node_id: PlainNodeId) -> Self {
Self {
tail: TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST)),
connection,
store_latency: metrics::histogram!(BIFROST_SEQ_STORE_DURATION, "node_id" => node_id.to_string()),
}
}
Expand All @@ -51,115 +44,34 @@ impl RemoteLogServer {
pub fn store_latency(&self) -> &Histogram {
&self.store_latency
}

pub fn connection(&self) -> &WeakConnection {
&self.connection
}
}

/// LogServerManager maintains a set of [`RemoteLogServer`]s that provided via the
/// [`NodeSet`].
///
/// The manager makes sure there is only one active connection per server.
/// It's up to the user of the client to do [`LogServerManager::renew`] if needed
#[derive(Clone)]
pub struct RemoteLogServerManager {
servers: Arc<BTreeMap<PlainNodeId, LogServerLock>>,
servers: Arc<HashMap<PlainNodeId, RemoteLogServer>>,
}

impl RemoteLogServerManager {
/// creates the node set and start the appenders
pub fn new(nodeset: &NodeSet) -> Self {
let servers = nodeset
.iter()
.map(|node_id| (*node_id, LogServerLock::default()))
.copied()
.map(|node_id| (node_id, RemoteLogServer::new(node_id)))
.collect();
let servers = Arc::new(servers);

Self { servers }
}

pub fn try_get_tail_offset(&self, id: PlainNodeId) -> Option<TailOffsetWatch> {
pub fn get_tail_offset(&self, id: PlainNodeId) -> &TailOffsetWatch {
let server = self.servers.get(&id).expect("node is in nodeset");

if let Ok(guard) = server.try_lock() {
if let Some(current) = guard.deref() {
return Some(current.local_tail().clone());
}
}

None
server.local_tail()
}

/// Gets a log-server instance. On first time it will initialize a new connection
/// to log server. It will make sure all following get call holds the same
/// connection.
///
/// It's up to the client to call [`Self::renew`] if the connection it holds
/// is closed.
pub async fn get<T: TransportConnect>(
&self,
id: PlainNodeId,
networking: &Networking<T>,
) -> Result<RemoteLogServer, NetworkError> {
let server = self.servers.get(&id).expect("node is in nodeset");

let mut guard = server.lock().await;

if let Some(current) = guard.deref() {
return Ok(current.clone());
}

let connection = networking.node_connection(id).await?;
let server = RemoteLogServer::new(connection);

*guard = Some(server.clone());

Ok(server)
}

/// Renew makes sure server connection is renewed if and only if
/// the provided server holds an outdated connection. Otherwise
/// the latest connection associated with this server is used.
///
/// It's up the holder of the log server instance to retry to renew
/// if that connection is not valid.
///
/// It also guarantees that concurrent calls to renew on the same server instance
/// will only renew the connection once for all callers
///
/// However, this does not affect copies of LogServer that have been already retrieved
/// by calling [`Self::get()`].
///
/// Holder of old instances will have to call renew if they need to.
pub async fn renew<T: TransportConnect>(
&self,
server: &mut RemoteLogServer,
networking: &Networking<T>,
) -> Result<(), NetworkError> {
// this key must already be in the map
let current = self
.servers
.get(&server.connection.peer().as_plain())
.expect("node is in nodeset");

let mut guard = current.lock().await;

// if you calling renew then the LogServer has already been initialized
let inner = guard.as_mut().expect("initialized log server instance");

if inner.connection != server.connection {
// someone else has already renewed the connection
server.connection = inner.connection.clone();
return Ok(());
}

let connection = networking
.node_connection(server.connection.peer().as_plain())
.await?;
inner.connection = connection.clone();
server.connection = connection.clone();

Ok(())
pub fn get(&self, id: PlainNodeId) -> &RemoteLogServer {
self.servers.get(&id).expect("node is in nodeset")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_TOTAL: &str =
pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_BYTES: &str =
"restate.bifrost.sequencer.committed_records.bytes";
pub(crate) const BIFROST_SEQ_STORE_DURATION: &str = "restate.bifrost.sequencer.store_duration";
pub(crate) const BIFROST_SEQ_APPEND_DURATION: &str = "restate.bifrost.sequencer.append_duration";

pub(crate) fn describe_metrics() {
describe_counter!(
Expand Down Expand Up @@ -72,6 +73,12 @@ pub(crate) fn describe_metrics() {
"Size of records committed"
);

describe_histogram!(
BIFROST_SEQ_APPEND_DURATION,
Unit::Seconds,
"Append batch duration in seconds as measured by the sequencer"
);

describe_histogram!(
BIFROST_SEQ_STORE_DURATION,
Unit::Seconds,
Expand Down
Loading

0 comments on commit e6423d8

Please sign in to comment.