Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganize imports #2662

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 64 additions & 44 deletions crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, BTreeSet};
use std::ops::Deref;
use std::ops::{Add, Deref};
use std::sync::Arc;
use std::time::Duration;

use futures::future::OptionFuture;
use itertools::Itertools;
use rand::Rng;
use restate_bifrost::Bifrost;
use restate_core::network::TransportConnect;
use restate_core::{my_node_id, Metadata};
Expand All @@ -28,7 +30,7 @@ use restate_types::{GenerationalNodeId, PlainNodeId, Version};
use tokio::sync::watch;
use tokio::time;
use tokio::time::{Interval, MissedTickBehavior};
use tracing::{info, instrument, trace, warn};
use tracing::{debug, info, instrument, trace, warn};

use crate::cluster_controller::cluster_state_refresher::ClusterStateWatcher;
use crate::cluster_controller::logs_controller::{
Expand Down Expand Up @@ -310,7 +312,7 @@ where
logs_metadata_version = tracing::field::Empty,
),
)]
async fn trim_logs(&self) {
async fn trim_logs(&mut self) {
let cluster_state = self.cluster_state_watcher.current();
if tracing::level_enabled!(tracing::Level::DEBUG) {
tracing::Span::current().record(
Expand All @@ -319,17 +321,28 @@ where
);
}

let new_trim_points = TrimMode::from(self.snapshots_repository_configured, &cluster_state)
.calculate_safe_trim_points();
trace!(?new_trim_points, "Calculated safe log trim points");
match TrimMode::from(self.snapshots_repository_configured, &cluster_state)
.calculate_safe_trim_points()
{
Ok(trim_points) => {
trace!(?trim_points, "Calculated safe log trim points");

for (log_id, (trim_point, partition_id)) in new_trim_points {
let result = self.bifrost.admin().trim(log_id, trim_point).await;
for (log_id, (trim_point, partition_id)) in trim_points {
let result = self.bifrost.admin().trim(log_id, trim_point).await;
if let Err(err) = result {
warn!(
%partition_id,
"Failed to trim log {log_id}. This can lead to increased disk usage: {err}"
);
}
}
}

if let Err(err) = result {
Err(TrimPointsUnavailable::UnknownPartitionStatus(blocking_nodes)) => {
warn!(
%partition_id,
"Failed to trim log {log_id}. This can lead to increased disk usage: {err}"
?blocking_nodes,
"Log trimming is suspended until we can determine the processor state on all known cluster nodes. \
Remove decommissioned nodes from the cluster and/or enable partition snapshotting to unblock log trimming"
);
}
}
Expand All @@ -342,7 +355,12 @@ fn create_log_trim_check_interval(options: &AdminOptions) -> Option<Interval> {
.inspect(|_| info!("The log trim threshold setting is deprecated and will be ignored"));

options.log_trim_interval.map(|interval| {
let mut interval = tokio::time::interval(interval.into());
// delay the initial trim check, and add a small amount of jitter to avoid synchronization
// among partition leaders in case of coordinated cluster restarts
let jitter = rand::rng().random_range(Duration::ZERO..interval.mul_f32(0.1));
let start_at = time::Instant::now().add(interval.into()).add(jitter);

let mut interval = time::interval_at(start_at, interval.into());
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval
})
Expand All @@ -360,6 +378,13 @@ enum TrimMode {
},
}

#[derive(Debug, PartialEq)]
enum TrimPointsUnavailable {
/// Trim points can not be determined because some (dead or otherwise
/// unreachable) nodes have not reported the state of partitions.
UnknownPartitionStatus(BTreeSet<PlainNodeId>),
}

impl TrimMode {
fn from(snapshots_repository_configured: bool, cluster_state: &Arc<ClusterState>) -> TrimMode {
let mut partition_status: BTreeMap<
Expand Down Expand Up @@ -401,19 +426,16 @@ impl TrimMode {
/// as archived LSNs are reported for all partitions, trimming can continue even in the presence of
/// some dead nodes. This is because we assume that if those nodes are only temporarily down, they
/// can fast-forward state from the snapshot repository when they return into service.
fn calculate_safe_trim_points(&self) -> BTreeMap<LogId, (Lsn, PartitionId)> {
let mut safe_trim_points = BTreeMap::new();

fn calculate_safe_trim_points(
&self,
) -> Result<BTreeMap<LogId, (Lsn, PartitionId)>, TrimPointsUnavailable> {
if let Some(blocking_nodes) = self.get_trim_blockers() {
warn!(
?blocking_nodes,
"Log trimming is suspended until we can determine the processor state on all known cluster nodes. \
This may result in increased log usage. Prune permanently decommissioned nodes and/or enable partition \
snapshotting to unblock trimming."
);
return safe_trim_points;
return Err(TrimPointsUnavailable::UnknownPartitionStatus(
blocking_nodes.clone(),
));
}

let mut safe_trim_points = BTreeMap::new();
match self {
TrimMode::ArchivedLsn { partition_status } => {
for (partition_id, processor_status) in partition_status.iter() {
Expand All @@ -439,22 +461,14 @@ impl TrimMode {
.max()
.unwrap_or(Lsn::INVALID);

if archived_lsn <= min_applied_lsn {
trace!(
?partition_id,
"Safe trim point for log {}: {:?}",
log_id,
archived_lsn
);
safe_trim_points.insert(log_id, (archived_lsn, *partition_id));
} else {
warn!(
if archived_lsn > min_applied_lsn {
debug!(
?partition_id,
?min_applied_lsn,
"Some alive nodes have not applied the log up to the archived LSN"
);
safe_trim_points.insert(log_id, (archived_lsn, *partition_id));
}
safe_trim_points.insert(log_id, (archived_lsn, *partition_id));
}
}
TrimMode::PersistedLsn {
Expand Down Expand Up @@ -483,7 +497,7 @@ impl TrimMode {
}
}

safe_trim_points
Ok(safe_trim_points)
}

// If any partitions are reporting an archived LSN, they must have snapshotting enabled.
Expand Down Expand Up @@ -526,7 +540,7 @@ mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;

use crate::cluster_controller::service::state::TrimMode;
use crate::cluster_controller::service::state::{TrimMode, TrimPointsUnavailable};
use restate_types::cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode,
SuspectNode,
Expand Down Expand Up @@ -633,11 +647,11 @@ mod tests {
assert!(matches!(trim_mode, TrimMode::PersistedLsn { .. }));
assert_eq!(
trim_points,
BTreeMap::from([
Ok(BTreeMap::from([
(LogId::from(p1), (Lsn::INVALID, p1)),
(LogId::from(p2), (Lsn::INVALID, p2)),
(LogId::from(p3), (Lsn::new(5), p3)),
]),
])),
"Use min persisted LSN across the cluster as the safe point when not archiving"
);

Expand All @@ -660,7 +674,9 @@ mod tests {
assert!(matches!(trim_mode, TrimMode::PersistedLsn { .. }));
assert_eq!(
trim_points,
BTreeMap::new(),
Err(TrimPointsUnavailable::UnknownPartitionStatus(
[PlainNodeId::new(3)].iter().copied().collect()
)),
"Any dead nodes in cluster block trimming when using persisted LSN mode"
);

Expand All @@ -677,7 +693,9 @@ mod tests {
assert!(matches!(trim_mode, TrimMode::PersistedLsn { .. }));
assert_eq!(
trim_points,
BTreeMap::new(),
Err(TrimPointsUnavailable::UnknownPartitionStatus(
[PlainNodeId::new(3)].iter().copied().collect()
)),
"Any dead nodes in cluster block trimming when using persisted LSN mode"
);

Expand All @@ -697,7 +715,9 @@ mod tests {
assert!(matches!(trim_mode, TrimMode::PersistedLsn { .. }));
assert_eq!(
trim_points,
BTreeMap::new(),
Err(TrimPointsUnavailable::UnknownPartitionStatus(
[PlainNodeId::new(3)].iter().copied().collect()
)),
"Any dead nodes in cluster block trimming when using persisted LSN mode"
);
}
Expand Down Expand Up @@ -818,12 +838,12 @@ mod tests {

assert_eq!(
trim_points,
BTreeMap::from([
Ok(BTreeMap::from([
(LogId::from(p1), (Lsn::INVALID, p1)),
(LogId::from(p2), (Lsn::new(10), p2)),
(LogId::from(p3), (Lsn::new(30), p3)),
(LogId::from(p4), (Lsn::new(40), p4)),
])
]))
);

let mut nodes = cluster_state.nodes.clone();
Expand All @@ -843,12 +863,12 @@ mod tests {

assert_eq!(
trim_points,
BTreeMap::from([
Ok(BTreeMap::from([
(LogId::from(p1), (Lsn::INVALID, p1)),
(LogId::from(p2), (Lsn::new(10), p2)),
(LogId::from(p3), (Lsn::new(30), p3)),
(LogId::from(p4), (Lsn::new(40), p4)),
]),
])),
"presence of dead or suspect nodes does not block trimming when archived LSN is reported"
);
}
Expand Down
Loading