Skip to content

Commit

Permalink
[WIP] sequencer updates
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Feb 5, 2025
1 parent 0ee5869 commit 1782128
Show file tree
Hide file tree
Showing 15 changed files with 425 additions and 320 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ restate-rocksdb = { workspace = true }
restate-test-util = { workspace = true, optional = true }
restate-types = { workspace = true }

ahash = { workspace = true }
anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use std::sync::Arc;

use tracing::{debug, info, instrument};
use tracing::{debug, instrument, warn};

use restate_core::metadata_store::retry_on_retryable_error;
use restate_core::{Metadata, MetadataKind};
Expand Down Expand Up @@ -158,7 +158,7 @@ impl<'a> BifrostAdmin<'a> {
self.inner.writeable_loglet(log_id).await
}

#[instrument(level = "debug", skip(self), err)]
#[instrument(level = "debug", skip(self))]
pub async fn seal(&self, log_id: LogId, segment_index: SegmentIndex) -> Result<SealedSegment> {
self.inner.fail_if_shutting_down()?;
// first find the tail segment for this log.
Expand All @@ -177,7 +177,7 @@ impl<'a> BifrostAdmin<'a> {
match err {
crate::loglet::OperationError::Shutdown(err) => return Err(err.into()),
crate::loglet::OperationError::Other(err) => {
info!(
warn!(
log_id = %log_id,
segment = %segment_index,
%err,
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/replicated_loglet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use restate_types::replication::DecoratedNodeSet;

use crate::loglet::OperationError;

#[derive(Default, derive_more::Display, derive_more::Debug)]
#[derive(Default, derive_more::Display, derive_more::Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub enum NodeSealStatus {
#[display("E")]
Error,
Expand Down
108 changes: 10 additions & 98 deletions crates/bifrost/src/providers/replicated_loglet/log_server_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::{collections::BTreeMap, ops::Deref, sync::Arc};
use std::sync::Arc;

use crossbeam_utils::CachePadded;
use ahash::HashMap;
use metrics::Histogram;
use tokio::sync::Mutex;

use restate_core::network::{NetworkError, Networking, TransportConnect, WeakConnection};
use restate_types::{
logs::{LogletOffset, SequenceNumber, TailState},
replication::NodeSet,
Expand All @@ -24,22 +22,17 @@ use restate_types::{
use super::metric_definitions::BIFROST_SEQ_STORE_DURATION;
use crate::loglet::util::TailOffsetWatch;

type LogServerLock = CachePadded<Mutex<Option<RemoteLogServer>>>;

/// LogServer instance
#[derive(Clone)]
pub struct RemoteLogServer {
tail: TailOffsetWatch,
connection: WeakConnection,
store_latency: Histogram,
}

impl RemoteLogServer {
fn new(connection: WeakConnection) -> Self {
let node_id = connection.peer().as_plain();
fn new(node_id: PlainNodeId) -> Self {
Self {
tail: TailOffsetWatch::new(TailState::Open(LogletOffset::OLDEST)),
connection,
store_latency: metrics::histogram!(BIFROST_SEQ_STORE_DURATION, "node_id" => node_id.to_string()),
}
}
Expand All @@ -51,115 +44,34 @@ impl RemoteLogServer {
pub fn store_latency(&self) -> &Histogram {
&self.store_latency
}

pub fn connection(&self) -> &WeakConnection {
&self.connection
}
}

/// LogServerManager maintains a set of [`RemoteLogServer`]s that provided via the
/// [`NodeSet`].
///
/// The manager makes sure there is only one active connection per server.
/// It's up to the user of the client to do [`LogServerManager::renew`] if needed
#[derive(Clone)]
pub struct RemoteLogServerManager {
servers: Arc<BTreeMap<PlainNodeId, LogServerLock>>,
servers: Arc<HashMap<PlainNodeId, RemoteLogServer>>,
}

impl RemoteLogServerManager {
/// creates the node set and start the appenders
pub fn new(nodeset: &NodeSet) -> Self {
let servers = nodeset
.iter()
.map(|node_id| (*node_id, LogServerLock::default()))
.copied()
.map(|node_id| (node_id, RemoteLogServer::new(node_id)))
.collect();
let servers = Arc::new(servers);

Self { servers }
}

pub fn try_get_tail_offset(&self, id: PlainNodeId) -> Option<TailOffsetWatch> {
pub fn get_tail_offset(&self, id: PlainNodeId) -> &TailOffsetWatch {
let server = self.servers.get(&id).expect("node is in nodeset");

if let Ok(guard) = server.try_lock() {
if let Some(current) = guard.deref() {
return Some(current.local_tail().clone());
}
}

None
server.local_tail()
}

/// Gets a log-server instance. On first time it will initialize a new connection
/// to log server. It will make sure all following get call holds the same
/// connection.
///
/// It's up to the client to call [`Self::renew`] if the connection it holds
/// is closed.
pub async fn get<T: TransportConnect>(
&self,
id: PlainNodeId,
networking: &Networking<T>,
) -> Result<RemoteLogServer, NetworkError> {
let server = self.servers.get(&id).expect("node is in nodeset");

let mut guard = server.lock().await;

if let Some(current) = guard.deref() {
return Ok(current.clone());
}

let connection = networking.node_connection(id).await?;
let server = RemoteLogServer::new(connection);

*guard = Some(server.clone());

Ok(server)
}

/// Renew makes sure server connection is renewed if and only if
/// the provided server holds an outdated connection. Otherwise
/// the latest connection associated with this server is used.
///
/// It's up the holder of the log server instance to retry to renew
/// if that connection is not valid.
///
/// It also guarantees that concurrent calls to renew on the same server instance
/// will only renew the connection once for all callers
///
/// However, this does not affect copies of LogServer that have been already retrieved
/// by calling [`Self::get()`].
///
/// Holder of old instances will have to call renew if they need to.
pub async fn renew<T: TransportConnect>(
&self,
server: &mut RemoteLogServer,
networking: &Networking<T>,
) -> Result<(), NetworkError> {
// this key must already be in the map
let current = self
.servers
.get(&server.connection.peer().as_plain())
.expect("node is in nodeset");

let mut guard = current.lock().await;

// if you calling renew then the LogServer has already been initialized
let inner = guard.as_mut().expect("initialized log server instance");

if inner.connection != server.connection {
// someone else has already renewed the connection
server.connection = inner.connection.clone();
return Ok(());
}

let connection = networking
.node_connection(server.connection.peer().as_plain())
.await?;
inner.connection = connection.clone();
server.connection = connection.clone();

Ok(())
pub fn get(&self, id: PlainNodeId) -> &RemoteLogServer {
self.servers.get(&id).expect("node is in nodeset")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_TOTAL: &str =
pub(crate) const BIFROST_SEQ_RECORDS_COMMITTED_BYTES: &str =
"restate.bifrost.sequencer.committed_records.bytes";
pub(crate) const BIFROST_SEQ_STORE_DURATION: &str = "restate.bifrost.sequencer.store_duration";
pub(crate) const BIFROST_SEQ_APPEND_DURATION: &str = "restate.bifrost.sequencer.append_duration";

pub(crate) fn describe_metrics() {
describe_counter!(
Expand Down Expand Up @@ -79,6 +80,12 @@ pub(crate) fn describe_metrics() {
"Size of records committed"
);

describe_histogram!(
BIFROST_SEQ_APPEND_DURATION,
Unit::Seconds,
"Append batch duration in seconds as measured by the sequencer"
);

describe_histogram!(
BIFROST_SEQ_STORE_DURATION,
Unit::Seconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::fmt::Display;
use std::hash::{Hash, Hasher};

use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use itertools::Itertools;
use restate_types::replication::DecoratedNodeSet;
use tracing::warn;

use restate_types::locality::{LocationScope, NodeLocation};
Expand Down Expand Up @@ -734,6 +736,22 @@ impl<Attr: Eq + Hash> LocationScopeState<Attr> {
}
}

impl<'a, Attr> IntoIterator for &'a NodeSetChecker<Attr> {
type Item = (&'a PlainNodeId, &'a Attr);

type IntoIter = <&'a HashMap<PlainNodeId, Attr> as IntoIterator>::IntoIter;

fn into_iter(self) -> Self::IntoIter {
self.node_to_attr.iter()
}
}

impl<Attr> From<NodeSetChecker<Attr>> for DecoratedNodeSet<Attr> {
fn from(val: NodeSetChecker<Attr>) -> DecoratedNodeSet<Attr> {
DecoratedNodeSet::from_iter(val.node_to_attr)
}
}

#[cfg(test)]
mod tests {
use std::str::FromStr;
Expand Down
Loading

0 comments on commit 1782128

Please sign in to comment.