Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost] Sequencer gives priority to GetSequencerState requests #2672

Merged
merged 6 commits into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ xtask = "run --package xtask --"
# when changing these please also change .github/workflows/steps/release-build-setup.yml
rustflags = [
"-C", "force-unwind-tables", # Include full unwind tables when aborting on panic
"-C" , "debug-assertions", # Enable debug assertions in release builds to have more safeguards in place
"--cfg", "uuid_unstable", # Enable unstable Uuid
"--cfg", "tokio_unstable", # Enable unstable tokio
]

[target.aarch64-unknown-linux-gnu]
rustflags = [
"-C", "force-unwind-tables", # Include full unwind tables when aborting on panic
"-C" , "debug-assertions", # Enable debug assertions in release builds to have more safeguards in place
"--cfg", "uuid_unstable", # Enable unstable Uuid
"--cfg", "tokio_unstable", # Enable unstable tokio
"-C" , "force-frame-pointers=yes", # Enable frame pointers to support Parca (https://github.com/parca-dev/parca-agent/pull/1805)
Expand All @@ -25,7 +23,6 @@ rustflags = [
[target.x86_64-unknown-linux-musl]
rustflags = [
"-C", "force-unwind-tables", # Include full unwind tables when aborting on panic
"-C" , "debug-assertions", # Enable debug assertions in release builds to have more safeguards in place
"--cfg", "uuid_unstable", # Enable unstable Uuid
"--cfg", "tokio_unstable", # Enable unstable tokio
"-C", "link-self-contained=yes", # Link statically
Expand All @@ -34,7 +31,6 @@ rustflags = [
[target.aarch64-unknown-linux-musl]
rustflags = [
"-C", "force-unwind-tables", # Include full unwind tables when aborting on panic
"-C" , "debug-assertions", # Enable debug assertions in release builds to have more safeguards in place
"--cfg", "uuid_unstable", # Enable unstable Uuid
"--cfg", "tokio_unstable", # Enable unstable tokio
"-C", "force-frame-pointers=yes", # Enable frame pointers to support Parca (https://github.com/parca-dev/parca-agent/pull/1805)
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ jobs:
if: "${{ runner.os == 'macOS' }}"
run: "echo MACOSX_DEPLOYMENT_TARGET=\"10.14.0\" >> \"$GITHUB_ENV\""
- name: "Set up RUSTFLAGS"
run: "echo RUSTFLAGS=\"-C force-unwind-tables -C debug-assertions --cfg uuid_unstable --cfg tokio_unstable\" >> \"$GITHUB_ENV\""
run: "echo RUSTFLAGS=\"-C force-unwind-tables --cfg uuid_unstable --cfg tokio_unstable\" >> \"$GITHUB_ENV\""
- name: Install dist
run: ${{ matrix.install_dist.run }}
# Get the dist-manifest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/steps/release-build-setup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@
# cargo-dist isn't currently able to take these from .cargo/config.toml
# https://github.com/axodotdev/cargo-dist/issues/1571
- name: Set up RUSTFLAGS
run: echo RUSTFLAGS="-C force-unwind-tables -C debug-assertions --cfg uuid_unstable --cfg tokio_unstable" >> "$GITHUB_ENV"
run: echo RUSTFLAGS="-C force-unwind-tables --cfg uuid_unstable --cfg tokio_unstable" >> "$GITHUB_ENV"
41 changes: 39 additions & 2 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ mod tests {
use super::Service;

use std::collections::BTreeSet;
use std::num::NonZero;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -796,7 +797,7 @@ mod tests {
use restate_core::test_env::NoOpMessageHandler;
use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder};
use restate_types::cluster::cluster_state::{NodeState, PartitionProcessorStatus};
use restate_types::config::{AdminOptions, BifrostOptions, Configuration};
use restate_types::config::{AdminOptions, BifrostOptions, Configuration, NetworkingOptions};
use restate_types::health::HealthStatus;
use restate_types::identifiers::PartitionId;
use restate_types::live::Live;
Expand Down Expand Up @@ -898,7 +899,13 @@ mod tests {
let mut admin_options = AdminOptions::default();
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let networking = NetworkingOptions {
// we are using failing transport so we only want to use the mock connection we created.
num_concurrent_connections: NonZero::new(1).unwrap(),
..Default::default()
};
let config = Configuration {
networking,
admin: admin_options,
..Default::default()
};
Expand Down Expand Up @@ -966,10 +973,17 @@ mod tests {
async fn auto_trim_log() -> anyhow::Result<()> {
const LOG_ID: LogId = LogId::new(0);

let networking = NetworkingOptions {
// we are using failing transport so we only want to use the mock connection we created.
num_concurrent_connections: NonZero::new(1).unwrap(),
..Default::default()
};

let mut admin_options = AdminOptions::default();
let interval_duration = Duration::from_secs(10);
admin_options.log_trim_interval = Some(interval_duration.into());
let config = Configuration {
networking,
admin: admin_options,
..Default::default()
};
Expand Down Expand Up @@ -1042,7 +1056,14 @@ mod tests {
admin_options.log_trim_interval = Some(interval_duration.into());
let mut bifrost_options = BifrostOptions::default();
bifrost_options.default_provider = ProviderKind::InMemory;

let networking = NetworkingOptions {
// we are using failing transport so we only want to use the mock connection we created.
num_concurrent_connections: NonZero::new(1).unwrap(),
..Default::default()
};
let config = Configuration {
networking,
admin: admin_options,
bifrost: bifrost_options,
..Default::default()
Expand Down Expand Up @@ -1096,7 +1117,16 @@ mod tests {
const LOG_ID: LogId = LogId::new(0);
let interval_duration = Duration::from_secs(10);

let mut config: Configuration = Default::default();
let networking = NetworkingOptions {
// we are using failing transport so we only want to use the mock connection we created.
num_concurrent_connections: NonZero::new(1).unwrap(),
..Default::default()
};

let mut config: Configuration = Configuration {
networking,
..Default::default()
};
config.admin.log_trim_interval = Some(interval_duration.into());
config.bifrost.default_provider = ProviderKind::InMemory;
config.worker.snapshots.destination = Some("a-repository-somewhere".to_string());
Expand Down Expand Up @@ -1253,7 +1283,14 @@ mod tests {
admin_options.log_trim_interval = Some(interval_duration.into());
let mut bifrost_options = BifrostOptions::default();
bifrost_options.default_provider = ProviderKind::InMemory;
let networking = NetworkingOptions {
// we are using failing transport so we only want to use the mock connection we created.
num_concurrent_connections: NonZero::new(1).unwrap(),
..Default::default()
};

let config = Configuration {
networking,
admin: admin_options,
bifrost: bifrost_options,
..Default::default()
Expand Down
83 changes: 47 additions & 36 deletions crates/bifrost/src/providers/replicated_loglet/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl RequestPump {
router_builder: &mut MessageRouterBuilder,
) -> Self {
// todo(asoli) read from opts
let queue_length = 10;
let queue_length = 128;
let append_stream = router_builder.subscribe_to_stream(queue_length);
let get_sequencer_state_stream = router_builder.subscribe_to_stream(queue_length);
Self {
Expand All @@ -104,15 +104,16 @@ impl RequestPump {
let mut cancel = std::pin::pin!(cancellation_watcher());
loop {
tokio::select! {
biased;
_ = &mut cancel => {
break;
}
Some(append) = self.append_stream.next() => {
self.handle_append(&provider, append).await;
}
Some(get_sequencer_state) = self.get_sequencer_state_stream.next() => {
self.handle_get_sequencer_state(&provider, get_sequencer_state).await;
}
Some(append) = self.append_stream.next() => {
self.handle_append(&provider, append).await;
}
}
}

Expand Down Expand Up @@ -158,41 +159,51 @@ impl RequestPump {
return;
}

let tail = if msg.force_seal_check {
match loglet
.find_tail_inner(FindTailOptions::ForceSealCheck)
.await
{
Ok(tail) => tail,
Err(err) => {
let failure = SequencerState {
header: CommonResponseHeader {
known_global_tail: None,
sealed: None,
status: SequencerStatus::Error {
retryable: true,
message: err.to_string(),
if msg.force_seal_check {
let _ = TaskCenter::spawn(TaskKind::Disposable, "remote-check-seal", async move {
match loglet
.find_tail_inner(FindTailOptions::ForceSealCheck)
.await
{
Ok(tail) => {
let sequencer_state = SequencerState {
header: CommonResponseHeader {
known_global_tail: Some(tail.offset()),
sealed: Some(tail.is_sealed()),
status: SequencerStatus::Ok,
},
};
let _ = reciprocal.prepare(sequencer_state).try_send();
}
Err(err) => {
let failure = SequencerState {
header: CommonResponseHeader {
known_global_tail: None,
sealed: None,
status: SequencerStatus::Error {
retryable: true,
message: err.to_string(),
},
},
},
};
let _ = reciprocal.prepare(failure).try_send();
return;
};
let _ = reciprocal.prepare(failure).try_send();
}
}
}
Ok(())
});
} else {
// if we are not forced to check the seal, we can just return the last known tail from the
// sequencer's view
loglet.last_known_global_tail()
};

let sequencer_state = SequencerState {
header: CommonResponseHeader {
known_global_tail: Some(tail.offset()),
sealed: Some(tail.is_sealed()),
status: SequencerStatus::Ok,
},
};
let _ = reciprocal.prepare(sequencer_state).try_send();
let tail = loglet.last_known_global_tail();
let sequencer_state = SequencerState {
header: CommonResponseHeader {
known_global_tail: Some(tail.offset()),
sealed: Some(tail.is_sealed()),
status: SequencerStatus::Ok,
},
};
let _ = reciprocal.prepare(sequencer_state).try_send();
}
}

/// Infallible handle_append method
Expand Down Expand Up @@ -265,7 +276,7 @@ impl RequestPump {
return Err(SequencerStatus::LogletIdMismatch);
}

match self.create_loglet(provider, header).await {
match self.create_loglet(provider, header) {
Ok(loglet) => return Ok(loglet),
Err(SequencerStatus::UnknownLogId | SequencerStatus::UnknownSegmentIndex) => {
// possible outdated metadata
Expand Down Expand Up @@ -315,7 +326,7 @@ impl RequestPump {
}
}

async fn create_loglet<T: TransportConnect>(
fn create_loglet<T: TransportConnect>(
&self,
provider: &ReplicatedLogletProvider<T>,
header: &CommonRequestHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,18 @@ impl<Attr: Debug> Debug for NodeSetChecker<Attr> {

impl<Attr: Display> Display for NodeSetChecker<Attr> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use itertools::Position;
write!(f, "[")?;
for (node, attr) in self.node_to_attr.iter() {
write!(f, "{node} => {attr}, ")?;
for (pos, (node_id, attr)) in self
.node_to_attr
.iter()
.sorted_by_key(|v| v.0)
.with_position()
{
match pos {
Position::Only | Position::Last => write!(f, "{node_id}({attr})")?,
Position::First | Position::Middle => write!(f, "{node_id}({attr}), ")?,
}
}
write!(f, "]")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use std::sync::Weak;
use std::time::Duration;

use restate_types::retries::with_jitter;
use tokio::time::Instant;
use tracing::instrument;
use tracing::{debug, trace};
Expand Down Expand Up @@ -89,7 +90,7 @@ impl PeriodicTailChecker {
);
}
}
tokio::time::sleep(duration).await;
tokio::time::sleep(with_jitter(duration, 0.5)).await;
}
}
}
5 changes: 3 additions & 2 deletions crates/core/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,9 @@ impl Metadata {
min_version: Version,
) -> Result<Version, ShutdownError> {
let mut recv = self.inner.write_watches[metadata_kind].receive.clone();
let v = recv
.wait_for(|v| *v >= min_version)
// If we are already at the metadata version, avoid tokio's yielding to
// improve tail latencies when this is used in latency-sensitive operations.
let v = tokio::task::unconstrained(recv.wait_for(|v| *v >= min_version))
.await
.map_err(|_| ShutdownError)?;
Ok(*v)
Expand Down
5 changes: 1 addition & 4 deletions crates/core/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use std::time::Instant;

use enum_map::{enum_map, EnumMap};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::Instant;
use tracing::{debug, trace};

use restate_types::net::codec::Targeted;
Expand All @@ -27,7 +27,6 @@ use restate_types::protobuf::node::Header;
use restate_types::protobuf::node::Message;
use restate_types::{GenerationalNodeId, Version};

use super::metric_definitions::MESSAGE_SENT;
use super::NetworkError;
use super::Outgoing;
use crate::Metadata;
Expand Down Expand Up @@ -84,7 +83,6 @@ impl<M> SendPermit<'_, M> {
/// associated with the message.
pub(crate) fn send_raw(self, raw_message: Message) {
self.permit.send(raw_message);
MESSAGE_SENT.increment(1);
}
}

Expand Down Expand Up @@ -313,7 +311,6 @@ pub mod test_util {
use super::*;

use std::sync::Arc;
use std::time::Instant;

use async_trait::async_trait;
use futures::stream::BoxStream;
Expand Down
Loading
Loading