diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index d63b0b6dc0..d70902dd08 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -9,11 +9,13 @@ // by the Apache License, Version 2.0. use std::collections::{BTreeMap, BTreeSet}; -use std::ops::Deref; +use std::ops::{Add, Deref}; use std::sync::Arc; +use std::time::Duration; use futures::future::OptionFuture; use itertools::Itertools; +use rand::Rng; use restate_bifrost::Bifrost; use restate_core::network::TransportConnect; use restate_core::{my_node_id, Metadata}; @@ -28,7 +30,7 @@ use restate_types::{GenerationalNodeId, PlainNodeId, Version}; use tokio::sync::watch; use tokio::time; use tokio::time::{Interval, MissedTickBehavior}; -use tracing::{info, instrument, trace, warn}; +use tracing::{debug, info, instrument, trace, warn}; use crate::cluster_controller::cluster_state_refresher::ClusterStateWatcher; use crate::cluster_controller::logs_controller::{ @@ -310,7 +312,7 @@ where logs_metadata_version = tracing::field::Empty, ), )] - async fn trim_logs(&self) { + async fn trim_logs(&mut self) { let cluster_state = self.cluster_state_watcher.current(); if tracing::level_enabled!(tracing::Level::DEBUG) { tracing::Span::current().record( @@ -319,17 +321,28 @@ where ); } - let new_trim_points = TrimMode::from(self.snapshots_repository_configured, &cluster_state) - .calculate_safe_trim_points(); - trace!(?new_trim_points, "Calculated safe log trim points"); + match TrimMode::from(self.snapshots_repository_configured, &cluster_state) + .calculate_safe_trim_points() + { + Ok(trim_points) => { + trace!(?trim_points, "Calculated safe log trim points"); - for (log_id, (trim_point, partition_id)) in new_trim_points { - let result = self.bifrost.admin().trim(log_id, trim_point).await; + for (log_id, (trim_point, partition_id)) in trim_points { + let result = self.bifrost.admin().trim(log_id, trim_point).await; + if let Err(err) = result { + warn!( + %partition_id, + "Failed to trim log {log_id}. This can lead to increased disk usage: {err}" + ); + } + } + } - if let Err(err) = result { + Err(TrimPointsUnavailable::UnknownPartitionStatus(blocking_nodes)) => { warn!( - %partition_id, - "Failed to trim log {log_id}. This can lead to increased disk usage: {err}" + ?blocking_nodes, + "Log trimming is suspended until we can determine the processor state on all known cluster nodes. \ + Remove decommissioned nodes from the cluster and/or enable partition snapshotting to unblock log trimming" ); } } @@ -342,7 +355,12 @@ fn create_log_trim_check_interval(options: &AdminOptions) -> Option { .inspect(|_| info!("The log trim threshold setting is deprecated and will be ignored")); options.log_trim_interval.map(|interval| { - let mut interval = tokio::time::interval(interval.into()); + // delay the initial trim check, and add a small amount of jitter to avoid synchronization + // among partition leaders in case of coordinated cluster restarts + let jitter = rand::rng().random_range(Duration::ZERO..interval.mul_f32(0.1)); + let start_at = time::Instant::now().add(interval.into()).add(jitter); + + let mut interval = time::interval_at(start_at, interval.into()); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); interval }) @@ -360,6 +378,13 @@ enum TrimMode { }, } +#[derive(Debug, PartialEq)] +enum TrimPointsUnavailable { + /// Trim points can not be determined because some (dead or otherwise + /// unreachable) nodes have not reported the state of partitions. + UnknownPartitionStatus(BTreeSet), +} + impl TrimMode { fn from(snapshots_repository_configured: bool, cluster_state: &Arc) -> TrimMode { let mut partition_status: BTreeMap< @@ -401,19 +426,16 @@ impl TrimMode { /// as archived LSNs are reported for all partitions, trimming can continue even in the presence of /// some dead nodes. This is because we assume that if those nodes are only temporarily down, they /// can fast-forward state from the snapshot repository when they return into service. - fn calculate_safe_trim_points(&self) -> BTreeMap { - let mut safe_trim_points = BTreeMap::new(); - + fn calculate_safe_trim_points( + &self, + ) -> Result, TrimPointsUnavailable> { if let Some(blocking_nodes) = self.get_trim_blockers() { - warn!( - ?blocking_nodes, - "Log trimming is suspended until we can determine the processor state on all known cluster nodes. \ - This may result in increased log usage. Prune permanently decommissioned nodes and/or enable partition \ - snapshotting to unblock trimming." - ); - return safe_trim_points; + return Err(TrimPointsUnavailable::UnknownPartitionStatus( + blocking_nodes.clone(), + )); } + let mut safe_trim_points = BTreeMap::new(); match self { TrimMode::ArchivedLsn { partition_status } => { for (partition_id, processor_status) in partition_status.iter() { @@ -439,22 +461,14 @@ impl TrimMode { .max() .unwrap_or(Lsn::INVALID); - if archived_lsn <= min_applied_lsn { - trace!( - ?partition_id, - "Safe trim point for log {}: {:?}", - log_id, - archived_lsn - ); - safe_trim_points.insert(log_id, (archived_lsn, *partition_id)); - } else { - warn!( + if archived_lsn > min_applied_lsn { + debug!( ?partition_id, ?min_applied_lsn, "Some alive nodes have not applied the log up to the archived LSN" ); - safe_trim_points.insert(log_id, (archived_lsn, *partition_id)); } + safe_trim_points.insert(log_id, (archived_lsn, *partition_id)); } } TrimMode::PersistedLsn { @@ -483,7 +497,7 @@ impl TrimMode { } } - safe_trim_points + Ok(safe_trim_points) } // If any partitions are reporting an archived LSN, they must have snapshotting enabled. @@ -526,7 +540,7 @@ mod tests { use std::collections::BTreeMap; use std::sync::Arc; - use crate::cluster_controller::service::state::TrimMode; + use crate::cluster_controller::service::state::{TrimMode, TrimPointsUnavailable}; use restate_types::cluster::cluster_state::{ AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode, SuspectNode, @@ -633,11 +647,11 @@ mod tests { assert!(matches!(trim_mode, TrimMode::PersistedLsn { .. })); assert_eq!( trim_points, - BTreeMap::from([ + Ok(BTreeMap::from([ (LogId::from(p1), (Lsn::INVALID, p1)), (LogId::from(p2), (Lsn::INVALID, p2)), (LogId::from(p3), (Lsn::new(5), p3)), - ]), + ])), "Use min persisted LSN across the cluster as the safe point when not archiving" ); @@ -660,7 +674,9 @@ mod tests { assert!(matches!(trim_mode, TrimMode::PersistedLsn { .. })); assert_eq!( trim_points, - BTreeMap::new(), + Err(TrimPointsUnavailable::UnknownPartitionStatus( + [PlainNodeId::new(3)].iter().copied().collect() + )), "Any dead nodes in cluster block trimming when using persisted LSN mode" ); @@ -677,7 +693,9 @@ mod tests { assert!(matches!(trim_mode, TrimMode::PersistedLsn { .. })); assert_eq!( trim_points, - BTreeMap::new(), + Err(TrimPointsUnavailable::UnknownPartitionStatus( + [PlainNodeId::new(3)].iter().copied().collect() + )), "Any dead nodes in cluster block trimming when using persisted LSN mode" ); @@ -697,7 +715,9 @@ mod tests { assert!(matches!(trim_mode, TrimMode::PersistedLsn { .. })); assert_eq!( trim_points, - BTreeMap::new(), + Err(TrimPointsUnavailable::UnknownPartitionStatus( + [PlainNodeId::new(3)].iter().copied().collect() + )), "Any dead nodes in cluster block trimming when using persisted LSN mode" ); } @@ -818,12 +838,12 @@ mod tests { assert_eq!( trim_points, - BTreeMap::from([ + Ok(BTreeMap::from([ (LogId::from(p1), (Lsn::INVALID, p1)), (LogId::from(p2), (Lsn::new(10), p2)), (LogId::from(p3), (Lsn::new(30), p3)), (LogId::from(p4), (Lsn::new(40), p4)), - ]) + ])) ); let mut nodes = cluster_state.nodes.clone(); @@ -843,12 +863,12 @@ mod tests { assert_eq!( trim_points, - BTreeMap::from([ + Ok(BTreeMap::from([ (LogId::from(p1), (Lsn::INVALID, p1)), (LogId::from(p2), (Lsn::new(10), p2)), (LogId::from(p3), (Lsn::new(30), p3)), (LogId::from(p4), (Lsn::new(40), p4)), - ]), + ])), "presence of dead or suspect nodes does not block trimming when archived LSN is reported" ); }