From 1807b583553acdee5a6e3915b56cae3d17c61e21 Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Mon, 29 Jan 2024 20:53:06 +0100 Subject: [PATCH 1/8] Clean up multiservice-discovery to be more robust, lock shared structures, and separate concerns. In particular, the use of parallel lists for thread end signaling channels and actual threads has been eliminated. Preliminary work to ensure other modifications are possible. --- Cargo.lock | 1 + .../multiservice-discovery/Cargo.toml | 1 + .../multiservice-discovery/src/definition.rs | 359 ++++++++++++++++-- .../multiservice-discovery/src/main.rs | 67 ++-- ...add_boundary_node_to_definition_handler.rs | 127 ++++--- .../server_handlers/add_definition_handler.rs | 57 +-- .../delete_definition_handler.rs | 35 +- .../src/server_handlers/dto.rs | 51 ++- .../export_prometheus_config_handler.rs | 26 +- .../server_handlers/export_targets_handler.rs | 26 +- .../server_handlers/get_definition_handler.rs | 23 +- .../src/server_handlers/mod.rs | 49 +-- .../replace_definitions_handler.rs | 141 +++---- 13 files changed, 608 insertions(+), 355 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3fa2aba64..8e30a9b22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5708,6 +5708,7 @@ dependencies = [ "crossbeam", "crossbeam-channel", "erased-serde 0.4.2", + "futures", "futures-util", "humantime", "ic-async-utils", diff --git a/rs/ic-observability/multiservice-discovery/Cargo.toml b/rs/ic-observability/multiservice-discovery/Cargo.toml index dff023c40..09d25239d 100644 --- a/rs/ic-observability/multiservice-discovery/Cargo.toml +++ b/rs/ic-observability/multiservice-discovery/Cargo.toml @@ -11,6 +11,7 @@ clap = { workspace = true } crossbeam = { workspace = true } crossbeam-channel = { workspace = true } erased-serde = { workspace = true } +futures.workspace = true futures-util = { workspace = true } humantime = { workspace = true } ic-async-utils = { workspace = true } diff --git a/rs/ic-observability/multiservice-discovery/src/definition.rs b/rs/ic-observability/multiservice-discovery/src/definition.rs index eac1ded78..c5110376b 100644 --- a/rs/ic-observability/multiservice-discovery/src/definition.rs +++ b/rs/ic-observability/multiservice-discovery/src/definition.rs @@ -2,16 +2,24 @@ use crossbeam_channel::Receiver; use crossbeam_channel::Sender; use ic_registry_client::client::ThresholdSigPublicKey; use service_discovery::job_types::JobType; +use service_discovery::IcServiceDiscovery; +use service_discovery::IcServiceDiscoveryError; +use service_discovery::TargetGroup; use service_discovery::{registry_sync::sync_local_registry, IcServiceDiscoveryImpl}; use slog::{debug, info, warn, Logger}; use std::collections::BTreeMap; use std::collections::BTreeSet; +use std::collections::HashSet; +use std::error::Error; +use std::fmt::Debug; +use std::fmt::{Display, Error as FmtError, Formatter}; use std::net::SocketAddr; use std::sync::Arc; use std::{ path::PathBuf, time::{Duration, Instant}, }; +use tokio::sync::Mutex; use url::Url; #[derive(Clone)] @@ -22,13 +30,50 @@ pub struct Definition { log: Logger, pub public_key: Option, pub poll_interval: Duration, - stop_signal: Receiver<()>, pub registry_query_timeout: Duration, - pub stop_signal_sender: Sender<()>, pub ic_discovery: Arc, pub boundary_nodes: Vec, } +impl Debug for Definition { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + write!( + f, + "Definition < + name: {}, + nns_urls: {:?} + public_key: {:?} +>", + self.name, self.nns_urls, self.public_key + ) + } +} + +struct Ender { + stop_signal_sender: Sender<()>, + join_handle: std::thread::JoinHandle<()>, +} + +#[derive(Debug)] +pub(crate) struct BoundaryNodeAlreadyExists { + name: String, +} + +impl Error for BoundaryNodeAlreadyExists {} + +impl Display for BoundaryNodeAlreadyExists { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + write!(f, "boundary node {} already exists", self.name) + } +} + +#[derive(Clone)] +pub(crate) struct RunningDefinition { + pub(crate) definition: Definition, + stop_signal: Receiver<()>, + ender: Arc>>, +} + impl Definition { pub(crate) fn new( nns_urls: Vec, @@ -37,9 +82,7 @@ impl Definition { log: Logger, public_key: Option, poll_interval: Duration, - stop_signal: Receiver<()>, registry_query_timeout: Duration, - stop_signal_sender: Sender<()>, ) -> Self { let global_registry_path = std::fs::canonicalize(global_registry_path).expect("Invalid global registry path"); // The path needs to be sanitized otherwise any file in the environment can be overwritten, @@ -55,55 +98,116 @@ impl Definition { log: log.clone(), public_key, poll_interval, - stop_signal, registry_query_timeout, - stop_signal_sender, ic_discovery: Arc::new(IcServiceDiscoveryImpl::new(log, registry_path, registry_query_timeout).unwrap()), boundary_nodes: vec![], } } + pub(crate) async fn run(self, rt: tokio::runtime::Handle) -> RunningDefinition { + fn wrap(definition: RunningDefinition, rt: tokio::runtime::Handle) -> impl FnMut() { + move || { + rt.block_on(definition.run()); + } + } + + info!(self.log, "Running new definition {}", self.name); + let (stop_signal_sender, stop_signal) = crossbeam::channel::bounded::<()>(0); + let ender: Arc>> = Arc::new(Mutex::new(None)); + let d = RunningDefinition { + definition: self, + stop_signal, + ender: ender.clone(), + }; + let join_handle = std::thread::spawn(wrap(d.clone(), rt)); + ender.lock().await.replace(Ender { + stop_signal_sender, + join_handle, + }); + d + } +} + +impl RunningDefinition { + pub(crate) async fn end(&mut self) { + let mut ender = self.ender.lock().await; + if let Some(s) = ender.take() { + // We have pulled out the channel from its container. After this, + // all senders will have been dropped, and no more messages can be sent. + // https://docs.rs/crossbeam/latest/crossbeam/channel/index.html#disconnection + info!( + self.definition.log, + "Sending termination signal to definition {}", self.definition.name + ); + s.stop_signal_sender.send(()).unwrap(); + info!( + self.definition.log, + "Joining definition {} thread", self.definition.name + ); + s.join_handle.join().unwrap(); + } + } + + pub(crate) fn get_target_groups( + &self, + job_type: JobType, + ) -> Result, IcServiceDiscoveryError> { + self.definition + .ic_discovery + .get_target_groups(job_type, self.definition.log.clone()) + } + async fn initial_registry_sync(&self) { - info!(self.log, "Syncing local registry for {} started", self.name); - info!(self.log, "Using local registry path: {}", self.registry_path.display()); + info!( + self.definition.log, + "Syncing local registry for {} started", self.definition.name + ); + info!( + self.definition.log, + "Using local registry path: {}", + self.definition.registry_path.display() + ); sync_local_registry( - self.log.clone(), - self.registry_path.join("targets"), - self.nns_urls.clone(), + self.definition.log.clone(), + self.definition.registry_path.join("targets"), + self.definition.nns_urls.clone(), false, - self.public_key, + self.definition.public_key, ) .await; - info!(self.log, "Syncing local registry for {} completed", self.name); + info!( + self.definition.log, + "Syncing local registry for {} completed", self.definition.name + ); } - async fn poll_loop(&mut self) { - let interval = crossbeam::channel::tick(self.poll_interval); + async fn poll_loop(&self) { + let interval = crossbeam::channel::tick(self.definition.poll_interval); let mut tick = Instant::now(); loop { debug!( - self.log, - "Loading new scraping targets for {}, (tick: {:?})", self.name, tick + self.definition.log, + "Loading new scraping targets for {}, (tick: {:?})", self.definition.name, tick ); - if let Err(e) = self.ic_discovery.load_new_ics(self.log.clone()) { + if let Err(e) = self.definition.ic_discovery.load_new_ics(self.definition.log.clone()) { warn!( - self.log, - "Failed to load new scraping targets for {} @ interval {:?}: {:?}", self.name, tick, e + self.definition.log, + "Failed to load new scraping targets for {} @ interval {:?}: {:?}", self.definition.name, tick, e ); } - debug!(self.log, "Update registries for {}", self.name); - if let Err(e) = self.ic_discovery.update_registries().await { + debug!(self.definition.log, "Update registries for {}", self.definition.name); + if let Err(e) = self.definition.ic_discovery.update_registries().await { warn!( - self.log, - "Failed to sync registry for {} @ interval {:?}: {:?}", self.name, tick, e + self.definition.log, + "Failed to sync registry for {} @ interval {:?}: {:?}", self.definition.name, tick, e ); } tick = crossbeam::select! { recv(self.stop_signal) -> _ => { - info!(self.log, "Received shutdown signal in poll_loop for {}", self.name); + info!(self.definition.log, "Received shutdown signal in poll_loop for {}", self.definition.name); return }, recv(interval) -> msg => msg.expect("tick failed!") @@ -111,38 +215,50 @@ impl Definition { } } - async fn run(&mut self) { + async fn run(&self) { self.initial_registry_sync().await; - info!(self.log, "Starting to watch for changes for definition {}", self.name); + info!( + self.definition.log, + "Starting to watch for changes for definition {}", self.definition.name + ); self.poll_loop().await; - if self.name != "mercury" { + if self.definition.name != "mercury" { info!( - self.log, + self.definition.log, "Removing registry dir '{}' for definition {}...", - self.registry_path.display(), - self.name + self.definition.registry_path.display(), + self.definition.name ); - if let Err(e) = std::fs::remove_dir_all(self.registry_path.clone()) { + if let Err(e) = std::fs::remove_dir_all(self.definition.registry_path.clone()) { warn!( - self.log, - "Failed to remove registry dir for definition {}: {:?}", self.name, e + self.definition.log, + "Failed to remove registry dir for definition {}: {:?}", self.definition.name, e ); } } } - pub fn add_boundary_node(&mut self, target: BoundaryNode) { - self.boundary_nodes.push(target); + pub(crate) async fn add_boundary_node(&mut self, target: BoundaryNode) -> Result<(), BoundaryNodeAlreadyExists> { + // Lock modifications to this object while mods are happening. + match self.ender.lock().await.as_ref() { + Some(_) => { + if let Some(bn) = self.definition.boundary_nodes.iter().find(|bn| bn.name == target.name) { + return Err(BoundaryNodeAlreadyExists { name: bn.name.clone() }); + }; + + self.definition.boundary_nodes.push(target); + Ok(()) + } + _ => Ok(()), // Ended. Do nothing. + } } -} -pub fn wrap(mut definition: Definition, rt: tokio::runtime::Handle) -> impl FnMut() { - move || { - rt.block_on(definition.run()); + pub fn name(&self) -> String { + self.definition.name.clone() } } @@ -153,3 +269,166 @@ pub struct BoundaryNode { pub custom_labels: BTreeMap, pub job_type: JobType, } + +#[derive(Debug)] +pub(crate) enum StartDefinitionError { + AlreadyExists(String), +} + +impl Error for StartDefinitionError {} + +impl Display for StartDefinitionError { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + match self { + Self::AlreadyExists(name) => write!(f, "definition {} is already running", name), + } + } +} +#[derive(Debug)] +pub(crate) struct StartDefinitionsError { + pub(crate) errors: Vec, +} + +impl Error for StartDefinitionsError {} + +impl Display for StartDefinitionsError { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + for e in self.errors.iter() { + write!(f, "* {}", e)? + } + Ok(()) + } +} + +#[derive(Debug)] +pub(crate) enum StopDefinitionError { + DoesNotExist(String), +} + +impl Error for StopDefinitionError {} + +impl Display for StopDefinitionError { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + match self { + Self::DoesNotExist(name) => write!(f, "definition {} does not exist", name), + } + } +} +#[derive(Debug)] +pub(crate) struct StopDefinitionsError { + pub(crate) errors: Vec, +} + +impl Error for StopDefinitionsError {} + +impl Display for StopDefinitionsError { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + for e in self.errors.iter() { + write!(f, "* {}", e)? + } + Ok(()) + } +} + +#[derive(Clone)] +pub(crate) struct DefinitionsSupervisor { + rt: tokio::runtime::Handle, + pub(crate) definitions: Arc>>, +} + +impl DefinitionsSupervisor { + pub(crate) fn new(rt: tokio::runtime::Handle) -> Self { + DefinitionsSupervisor { + rt, + definitions: Arc::new(Mutex::new(BTreeMap::new())), + } + } + + async fn start_inner( + &self, + existing: &mut BTreeMap, + definitions: Vec, + replace_existing: bool, + ) -> Result<(), StartDefinitionsError> { + let mut error = StartDefinitionsError { errors: vec![] }; + let mut names_added: HashSet = HashSet::new(); + + for definition in definitions.iter() { + let dname = definition.name.clone(); + // Check if we already have something running with the same name, + // if the user does not want to replace those with newer defs. + if !replace_existing && existing.contains_key(&dname) { + error.errors.push(StartDefinitionError::AlreadyExists(dname.clone())); + continue; + } + + // Check for incoming duplicates. + if names_added.contains(&dname) { + error.errors.push(StartDefinitionError::AlreadyExists(dname.clone())); + continue; + } + names_added.insert(dname); + } + + if !error.errors.is_empty() { + return Err(error); + } + + for definition in definitions.into_iter() { + // Here is where we stop already running definitions + // that have a name similar to the one being added. + if let Some(d) = existing.get_mut(&definition.name) { + d.end().await + } + // We stop X before we start X' because otherwise + // the newly-running definition will fight over + // shared disk space (a folder) and probably die. + existing.insert(definition.name.clone(), definition.run(self.rt.clone()).await); + } + Ok(()) + } + + /// Start a list of definitions. + /// + /// If replace_existing is true, any running definition matching the name + /// of any of the incoming definitions will be stopped. If it is false, + /// any incoming definition named after any running definition will + /// add an AlreadyExists error to the errors list. + pub(crate) async fn start( + &self, + definitions: Vec, + replace_existing: bool, + ) -> Result<(), StartDefinitionsError> { + let mut u = self.definitions.lock().await; + self.start_inner(&mut u, definitions, replace_existing).await + } + + async fn end_inner(&self, definitions: &mut BTreeMap) { + for (_, definition) in definitions.iter_mut() { + definition.end().await + } + definitions.clear() + } + + pub(crate) async fn end(&self) { + let mut u = self.definitions.lock().await; + self.end_inner(&mut u).await + } + + pub(crate) async fn stop(&self, definition_names: Vec) -> Result<(), StopDefinitionsError> { + let mut defs = self.definitions.lock().await; + let errors: Vec = definition_names + .iter() + .filter(|n| defs.contains_key(*n)) + .map(|n| StopDefinitionError::DoesNotExist(n.clone())) + .collect(); + if !errors.is_empty() { + return Err(StopDefinitionsError { errors }); + } + + for name in definition_names.into_iter() { + defs.remove(&name).unwrap().end().await + } + Ok(()) + } +} diff --git a/rs/ic-observability/multiservice-discovery/src/main.rs b/rs/ic-observability/multiservice-discovery/src/main.rs index 0d3a24da9..69cdb79f5 100644 --- a/rs/ic-observability/multiservice-discovery/src/main.rs +++ b/rs/ic-observability/multiservice-discovery/src/main.rs @@ -1,17 +1,15 @@ +use std::path::PathBuf; use std::time::Duration; -use std::{path::PathBuf, sync::Arc}; use clap::Parser; use futures_util::FutureExt; use humantime::parse_duration; -use slog::{error, info}; use slog::{o, Drain, Logger}; use tokio::runtime::Runtime; use tokio::sync::oneshot::{self}; -use tokio::sync::Mutex; use url::Url; -use definition::{wrap, Definition}; +use definition::{Definition, DefinitionsSupervisor}; use ic_async_utils::shutdown_signal; use crate::server_handlers::Server; @@ -24,48 +22,39 @@ fn main() { let log = make_logger(); let shutdown_signal = shutdown_signal(log.clone()).shared(); let cli_args = CliArgs::parse(); - let mut handles = vec![]; - let mut definitions = vec![]; - let (oneshot_sender, oneshot_receiver) = oneshot::channel(); - if !cli_args.start_without_mainnet { - let mainnet_definition = get_mainnet_definition(&cli_args, log.clone()); - definitions.push(mainnet_definition.clone()); - - let ic_handle = std::thread::spawn(wrap(mainnet_definition, rt.handle().clone())); - handles.push(ic_handle); - } - let definitions = Arc::new(Mutex::new(definitions)); - let handles = Arc::new(Mutex::new(handles)); + let supervisor = DefinitionsSupervisor::new(rt.handle().clone()); + let (server_stop, server_stop_receiver) = oneshot::channel(); //Configure server - let server = Server::new( - log.clone(), - definitions.clone(), - cli_args.poll_interval, - cli_args.registry_query_timeout, - cli_args.targets_dir, - handles.clone(), - rt.handle().clone(), + let server_handle = rt.spawn( + Server::new( + log.clone(), + supervisor.clone(), + cli_args.poll_interval, + cli_args.registry_query_timeout, + cli_args.targets_dir.clone(), + ) + .run(server_stop_receiver), ); - let server_handle = rt.spawn(server.run(oneshot_receiver)); + if !cli_args.start_without_mainnet { + rt.block_on(async { + let _ = supervisor + .start(vec![get_mainnet_definition(&cli_args, log.clone())], false) + .await; + }); + } + // Wait for shutdown signal. rt.block_on(shutdown_signal); - //Stop the server - oneshot_sender.send(()).unwrap(); - for definition in rt.block_on(definitions.lock()).drain(..) { - info!(log, "Sending termination signal to definition {}", definition.name); - definition.stop_signal_sender.send(()).unwrap(); - } + // Signal server to stop. Stop happens in parallel with supervisor stop. + server_stop.send(()).unwrap(); - for handle in rt.block_on(handles.lock()).drain(..) { - info!(log, "Joining definition thread"); - if let Err(e) = handle.join() { - error!(log, "Could not join thread handle of definition being removed: {:?}", e); - } - } + //Stop all definitions. End happens in parallel with server stop. + rt.block_on(supervisor.end()); + // Wait for server to stop. Should have stopped by now. rt.block_on(server_handle).unwrap(); } @@ -135,8 +124,6 @@ Start the discovery without the IC Mainnet target. } fn get_mainnet_definition(cli_args: &CliArgs, log: Logger) -> Definition { - let (ic_stop_signal_sender, ic_stop_signal_rcv) = crossbeam::channel::bounded::<()>(0); - Definition::new( vec![cli_args.nns_url.clone()], cli_args.targets_dir.clone(), @@ -144,8 +131,6 @@ fn get_mainnet_definition(cli_args: &CliArgs, log: Logger) -> Definition { log.clone(), None, cli_args.poll_interval, - ic_stop_signal_rcv, cli_args.registry_query_timeout, - ic_stop_signal_sender, ) } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs index 622c9cac5..842e4e979 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs @@ -1,79 +1,86 @@ -use std::str::FromStr; -use std::sync::Arc; +use slog::{error, info, Logger}; +use std::error::Error; +use std::fmt::{Display, Error as FmtError, Formatter}; -use service_discovery::job_types::JobType; -use slog::Logger; -use tokio::sync::Mutex; use warp::Reply; -use crate::definition::{BoundaryNode, Definition}; +use crate::definition::{BoundaryNodeAlreadyExists, DefinitionsSupervisor}; use crate::server_handlers::dto::BoundaryNodeDto; use crate::server_handlers::WebResult; +use super::dto::BadBoundaryNodeDtoError; + pub struct AddBoundaryNodeToDefinitionBinding { - pub definitions: Arc>>, + pub supervisor: DefinitionsSupervisor, pub log: Logger, } -pub async fn add_boundary_node( +#[derive(Debug)] +pub enum AddBoundaryNodeError { + DefinitionNotFound(String), + BoundaryNodeAlreadyExists(BoundaryNodeAlreadyExists), + BadBoundaryNodeDtoError(BadBoundaryNodeDtoError), +} + +impl Error for AddBoundaryNodeError {} + +impl Display for AddBoundaryNodeError { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + match self { + Self::DefinitionNotFound(name) => write!(f, "definition {} not found", name), + Self::BoundaryNodeAlreadyExists(name) => write!(f, "boundary node {} already exists", name), + Self::BadBoundaryNodeDtoError(e) => write!(f, "{}", e), + } + } +} + +impl From for AddBoundaryNodeError { + fn from(e: BadBoundaryNodeDtoError) -> Self { + Self::BadBoundaryNodeDtoError(e) + } +} + +impl From for AddBoundaryNodeError { + fn from(e: BoundaryNodeAlreadyExists) -> Self { + Self::BoundaryNodeAlreadyExists(e) + } +} + +pub async fn _add_boundary_node( boundary_node: BoundaryNodeDto, binding: AddBoundaryNodeToDefinitionBinding, -) -> WebResult { - let mut definitions = binding.definitions.lock().await; +) -> Result<(), AddBoundaryNodeError> { + let mut definitions = binding.supervisor.definitions.lock().await; - let definition = match definitions.iter_mut().find(|d| d.name == boundary_node.ic_name) { - Some(def) => def, - None => { - return Ok(warp::reply::with_status( - "Definition with this name does not exist".to_string(), - warp::http::StatusCode::BAD_REQUEST, - )) - } - }; + let running_definition = definitions + .get_mut(&boundary_node.ic_name) + .ok_or(AddBoundaryNodeError::DefinitionNotFound(boundary_node.ic_name.clone()))?; - if let Some(bn) = definition - .boundary_nodes - .iter() - .find(|bn| bn.name == boundary_node.name) - { - return Ok(warp::reply::with_status( - format!("Boundary node with name {} already exists", bn.name), - warp::http::StatusCode::BAD_REQUEST, - )); - }; + let bn = boundary_node.try_into_boundary_node()?; + + Ok(running_definition.add_boundary_node(bn).await?) +} - let job_type = match JobType::from_str(&boundary_node.job_type) { +pub(crate) async fn add_boundary_node( + boundary_node: BoundaryNodeDto, + binding: AddBoundaryNodeToDefinitionBinding, +) -> WebResult { + let log = binding.log.clone(); + let dname = boundary_node.ic_name.clone(); + match _add_boundary_node(boundary_node, binding).await { + Ok(_) => { + info!(log, "Added new boundary node to definition {}", dname); + Ok(warp::reply::with_status( + format!("Definition {} added successfully", dname), + warp::http::StatusCode::OK, + )) + } Err(e) => { - // We don't have this job type here. - return Ok(warp::reply::with_status( - format!("Job type {} is not known: {}", boundary_node.job_type, e), + error!(log, "Boundary node could not be added: {}", e); + Ok(warp::reply::with_status( + format!("Boundary node could not be added: {}", e), warp::http::StatusCode::BAD_REQUEST, - )); - } - Ok(jt) => { - // Forbid addition of any job type not known to be supported by boundary nodes. - if !JobType::all_for_boundary_nodes().contains(&jt) { - return Ok(warp::reply::with_status( - format!( - "Job type {} is not supported for boundary nodes.", - boundary_node.job_type - ), - warp::http::StatusCode::BAD_REQUEST, - )); - } - jt + )) } - }; - - definition.add_boundary_node(BoundaryNode { - name: boundary_node.name, - custom_labels: boundary_node.custom_labels, - targets: boundary_node.targets, - job_type, - }); - - Ok(warp::reply::with_status( - "".to_string(), - warp::http::StatusCode::CREATED, - )) + } } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs index 40505c620..b66a4b4eb 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs @@ -1,40 +1,46 @@ use slog::{error, info, Logger}; +use std::error::Error; use std::path::PathBuf; -use std::sync::Arc; -use std::thread::JoinHandle; use std::time::Duration; -use tokio::sync::Mutex; +use std::fmt::{Display, Error as FmtError, Formatter}; use warp::Reply; -use crate::definition::{wrap, Definition}; +use crate::definition::{DefinitionsSupervisor, StartDefinitionError}; +use crate::server_handlers::dto::BadDtoError; use crate::server_handlers::dto::DefinitionDto; use crate::server_handlers::WebResult; -use super::dto::BadDtoError; - pub(crate) struct AddDefinitionBinding { - pub definitions: Arc>>, + pub supervisor: DefinitionsSupervisor, pub log: Logger, pub registry_path: PathBuf, pub poll_interval: Duration, pub registry_query_timeout: Duration, - pub rt: tokio::runtime::Handle, - pub handles: Arc>>>, } -async fn _add_definition( - tentative_definition: DefinitionDto, - binding: AddDefinitionBinding, -) -> Result<(), BadDtoError> { - let mut existing_definitions = binding.definitions.lock().await; +#[derive(Debug)] +pub enum AddDefinitionError { + StartDefinitionError(StartDefinitionError), + BadDtoError(BadDtoError), +} + +impl Error for AddDefinitionError {} - let dname = tentative_definition.name.clone(); - if existing_definitions.iter().any(|d| d.name == tentative_definition.name) { - return Err(BadDtoError::AlreadyExists(dname)); +impl Display for AddDefinitionError { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + match self { + Self::StartDefinitionError(e) => write!(f, "{}", e), + Self::BadDtoError(e) => write!(f, "{}", e), + } } +} +async fn _add_definition( + tentative_definition: DefinitionDto, + binding: AddDefinitionBinding, +) -> Result<(), AddDefinitionError> { let new_definition = match tentative_definition .try_into_definition( binding.log.clone(), @@ -45,18 +51,15 @@ async fn _add_definition( .await { Ok(def) => def, - Err(e) => return Err(e), + Err(e) => return Err(AddDefinitionError::BadDtoError(e)), }; - info!(binding.log, "Adding new definition {} to existing", dname); - existing_definitions.push(new_definition.clone()); - - let mut existing_handles = binding.handles.lock().await; - // ...then start and record the handles for the new definitions. - info!(binding.log, "Starting thread for definition: {:?}", dname); - let joinhandle = std::thread::spawn(wrap(new_definition, binding.rt)); - existing_handles.push(joinhandle); - Ok(()) + match binding.supervisor.start(vec![new_definition], false).await { + Ok(()) => Ok(()), + Err(e) => Err(AddDefinitionError::StartDefinitionError( + e.errors.into_iter().next().unwrap(), + )), + } } pub(crate) async fn add_definition(definition: DefinitionDto, binding: AddDefinitionBinding) -> WebResult { diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs index c852def99..f5f19081f 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs @@ -1,15 +1,8 @@ -use std::sync::Arc; - -use tokio::sync::Mutex; -use warp::Reply; - -use crate::definition::Definition; +use crate::definition::DefinitionsSupervisor; use crate::server_handlers::WebResult; +use warp::Reply; -pub async fn delete_definition( - name: String, - definitions: Arc>>, -) -> WebResult { +pub async fn delete_definition(name: String, supervisor: DefinitionsSupervisor) -> WebResult { if name == "ic" { return Ok(warp::reply::with_status( "Cannot delete ic definition".to_string(), @@ -17,21 +10,13 @@ pub async fn delete_definition( )); } - let mut definitions = definitions.lock().await; - - let index = definitions.iter().position(|d| d.name == name); - - match index { - Some(index) => { - let definition = definitions.remove(index); - definition.stop_signal_sender.send(()).unwrap(); - Ok(warp::reply::with_status( - "success".to_string(), - warp::http::StatusCode::OK, - )) - } - None => Ok(warp::reply::with_status( - "Definition with this name does not exist".to_string(), + match supervisor.stop(vec![name.clone()]).await { + Ok(_) => Ok(warp::reply::with_status( + format!("Deleted definition {}", name.clone()), + warp::http::StatusCode::OK, + )), + Err(e) => Ok(warp::reply::with_status( + format!("Could not delete definition {}: {}", name, e), warp::http::StatusCode::BAD_REQUEST, )), } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/dto.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/dto.rs index 96e419433..788852ef6 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/dto.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/dto.rs @@ -1,6 +1,7 @@ use base64::{engine::general_purpose as b64, Engine as _}; use ic_crypto_utils_threshold_sig_der::parse_threshold_sig_key_from_der; use ic_registry_client::client::ThresholdSigPublicKey; +use service_discovery::job_types::{JobType, JobTypeParseError}; use service_discovery::registry_sync::nns_reachable; use serde::{Deserialize, Serialize}; @@ -10,10 +11,11 @@ use std::error::Error; use std::fmt::{Display, Error as FmtError, Formatter}; use std::net::SocketAddr; use std::path::PathBuf; +use std::str::FromStr; use std::time::Duration; use url::Url; -use crate::definition::Definition; +use crate::definition::{BoundaryNode, Definition}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DefinitionDto { @@ -25,7 +27,6 @@ pub struct DefinitionDto { #[derive(Debug)] pub(crate) enum BadDtoError { InvalidPublicKey(String, std::io::Error), - AlreadyExists(String), NNSUnreachable(String), } @@ -37,7 +38,6 @@ impl Display for BadDtoError { Self::InvalidPublicKey(name, e) => { write!(f, "public key of definition {} is invalid: {}", name, e) } - Self::AlreadyExists(name) => write!(f, "definition {} already exists", name), Self::NNSUnreachable(name) => { write!(f, "cannot reach any of the NNS nodes specified in definition {}", name) } @@ -57,7 +57,6 @@ impl DefinitionDto { return Err(BadDtoError::NNSUnreachable(self.name)); } - let (stop_signal_sender, stop_signal_rcv) = crossbeam::channel::bounded::<()>(0); Ok(Definition::new( self.nns_urls.clone(), registry_path, @@ -65,9 +64,7 @@ impl DefinitionDto { log, self.decode_public_key()?, poll_interval, - stop_signal_rcv, registry_query_timeout, - stop_signal_sender, )) } @@ -104,3 +101,45 @@ pub struct BoundaryNodeDto { pub targets: BTreeSet, pub job_type: String, } + +impl BoundaryNodeDto { + pub(crate) fn try_into_boundary_node(self) -> Result { + let job_type = match JobType::from_str(&self.job_type) { + Err(e) => { + // We don't have this job type here. + return Err(BadBoundaryNodeDtoError::JobTypeParseError(e)); + } + Ok(jt) => { + // Forbid addition of any job type not known to be supported by boundary nodes. + if !JobType::all_for_boundary_nodes().contains(&jt) { + return Err(BadBoundaryNodeDtoError::UnsupportedJobType(self.job_type)); + } + jt + } + }; + + Ok(BoundaryNode { + name: self.name, + custom_labels: self.custom_labels, + targets: self.targets, + job_type, + }) + } +} + +#[derive(Debug)] +pub enum BadBoundaryNodeDtoError { + JobTypeParseError(JobTypeParseError), + UnsupportedJobType(String), +} + +impl Error for BadBoundaryNodeDtoError {} + +impl Display for BadBoundaryNodeDtoError { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + match self { + Self::JobTypeParseError(e) => write!(f, "{}", e), + Self::UnsupportedJobType(name) => write!(f, "job type {} is not supported for boundary nodes", name), + } + } +} diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs index 7ba3ad798..da4dcfab9 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs @@ -1,29 +1,25 @@ use super::WebResult; -use crate::definition::Definition; +use crate::definition::DefinitionsSupervisor; use multiservice_discovery_shared::builders::prometheus_config_structure::{map_target_group, PrometheusStaticConfig}; use multiservice_discovery_shared::contracts::target::{map_to_target_dto, TargetDto}; -use service_discovery::{ - job_types::{JobType, NodeOS}, - IcServiceDiscovery, -}; +use service_discovery::job_types::{JobType, NodeOS}; use slog::Logger; -use std::{collections::BTreeMap, sync::Arc}; -use tokio::sync::Mutex; +use std::collections::BTreeMap; use warp::reply::Reply; pub struct ExportDefinitionConfigBinding { - pub definitions: Arc>>, + pub supervisor: DefinitionsSupervisor, pub log: Logger, } pub async fn export_prometheus_config(binding: ExportDefinitionConfigBinding) -> WebResult { - let definitions = binding.definitions.lock().await; + let definitions = binding.supervisor.definitions.lock().await; let mut ic_node_targets: Vec = vec![]; - for def in definitions.iter() { + for (_, def) in definitions.iter() { for job_type in JobType::all_for_ic_nodes() { - let targets = match def.ic_discovery.get_target_groups(job_type, binding.log.clone()) { + let targets = match def.get_target_groups(job_type) { Ok(targets) => targets, Err(_) => continue, }; @@ -37,7 +33,7 @@ pub async fn export_prometheus_config(binding: ExportDefinitionConfigBinding) -> job_type, BTreeMap::new(), target_group.node_id.to_string(), - def.name.clone(), + def.name(), )); } }); @@ -48,8 +44,8 @@ pub async fn export_prometheus_config(binding: ExportDefinitionConfigBinding) -> let boundary_nodes_targets = definitions .iter() - .flat_map(|def| { - def.boundary_nodes.iter().filter_map(|bn| { + .flat_map(|(_, def)| { + def.definition.boundary_nodes.iter().filter_map(|bn| { // Since boundary nodes have been checked for correct job // type when they were added via POST, then we can trust // the correct job type is at play here. @@ -68,7 +64,7 @@ pub async fn export_prometheus_config(binding: ExportDefinitionConfigBinding) -> targets: bn.targets.clone().iter().map(|g| bn.job_type.url(*g, true)).collect(), labels: { BTreeMap::from([ - ("ic", def.name.clone()), + ("ic", def.name()), ("name", bn.name.clone()), ("job", bn.job_type.to_string()), ]) diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs index 44e017cb2..25962c681 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs @@ -1,29 +1,25 @@ use super::WebResult; -use crate::definition::Definition; +use crate::definition::DefinitionsSupervisor; use ic_types::{NodeId, PrincipalId}; use multiservice_discovery_shared::contracts::target::{map_to_target_dto, TargetDto}; -use service_discovery::{ - job_types::{JobType, NodeOS}, - IcServiceDiscovery, -}; +use service_discovery::job_types::{JobType, NodeOS}; use slog::Logger; -use std::{collections::BTreeMap, sync::Arc}; -use tokio::sync::Mutex; +use std::collections::BTreeMap; use warp::reply::Reply; pub struct ExportTargetsBinding { - pub definitions: Arc>>, + pub supervisor: DefinitionsSupervisor, pub log: Logger, } pub async fn export_targets(binding: ExportTargetsBinding) -> WebResult { - let definitions = binding.definitions.lock().await; + let definitions = binding.supervisor.definitions.lock().await; let mut ic_node_targets: Vec = vec![]; - for def in definitions.iter() { + for (_, def) in definitions.iter() { for job_type in JobType::all_for_ic_nodes() { - let targets = match def.ic_discovery.get_target_groups(job_type, binding.log.clone()) { + let targets = match def.get_target_groups(job_type) { Ok(targets) => targets, Err(_) => continue, }; @@ -37,7 +33,7 @@ pub async fn export_targets(binding: ExportTargetsBinding) -> WebResult WebResult WebResult>>) -> WebResult { - let definitions = definitions.lock().await; +pub async fn get_definitions(supervisor: DefinitionsSupervisor) -> WebResult { + let definitions = supervisor.definitions.lock().await; - Ok(json( - &definitions - .iter() - .map(|d| d.into()) - .collect::>(), - )) + let list = &definitions + .iter() + .map(|(_, d)| { + let x = &d.definition; + x.into() + }) + .collect::>(); + Ok(json(list)) } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs index bc6a47732..b16d73c3a 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs @@ -1,13 +1,10 @@ use std::path::PathBuf; -use std::sync::Arc; -use std::thread::JoinHandle; use std::time::Duration; use slog::{info, Logger}; -use tokio::sync::Mutex; use warp::{Filter, Rejection}; -use crate::definition::Definition; +use crate::definition::DefinitionsSupervisor; use crate::server_handlers::add_boundary_node_to_definition_handler::add_boundary_node; use crate::server_handlers::add_boundary_node_to_definition_handler::AddBoundaryNodeToDefinitionBinding; use crate::server_handlers::add_definition_handler::{ @@ -35,115 +32,101 @@ pub type WebResult = Result; pub(crate) struct Server { log: Logger, - items: Arc>>, + supervisor: DefinitionsSupervisor, poll_interval: Duration, registry_query_timeout: Duration, registry_path: PathBuf, - handles: Arc>>>, - rt: tokio::runtime::Handle, } impl Server { pub(crate) fn new( log: Logger, - items: Arc>>, + supervisor: DefinitionsSupervisor, poll_interval: Duration, registry_query_timeout: Duration, registry_path: PathBuf, - handles: Arc>>>, - rt: tokio::runtime::Handle, ) -> Self { Self { log, - items, + supervisor, poll_interval, registry_query_timeout, registry_path, - handles, - rt, } } pub(crate) async fn run(self, recv: tokio::sync::oneshot::Receiver<()>) { let poll_interval = self.poll_interval; let registry_query_timeout = self.registry_query_timeout; - let add_items = self.items.clone(); + let add_items = self.supervisor.clone(); let add_log = self.log.clone(); - let add_handles = self.handles.clone(); - let add_rt = self.rt.clone(); let add_registry_path = self.registry_path.clone(); let add = warp::path::end() .and(warp::post()) .and(warp::body::json()) .and(warp::any().map(move || AddDefinitionBinding { - definitions: add_items.clone(), + supervisor: add_items.clone(), log: add_log.clone(), poll_interval, registry_query_timeout, registry_path: add_registry_path.clone(), - handles: add_handles.clone(), - rt: add_rt.clone(), })) .and_then(add_definition); - let put_items = self.items.clone(); + let put_items = self.supervisor.clone(); let put_log = self.log.clone(); - let put_handles = self.handles.clone(); - let put_rt = self.rt.clone(); let put_registry_path = self.registry_path.clone(); let put = warp::path::end() .and(warp::put()) .and(warp::body::json()) .and(warp::any().map(move || ReplaceDefinitionsBinding { - definitions: put_items.clone(), + supervisor: put_items.clone(), log: put_log.clone(), poll_interval, registry_query_timeout, registry_path: put_registry_path.clone(), - handles: put_handles.clone(), - rt: put_rt.clone(), })) .and_then(replace_definitions); - let get_items = self.items.clone(); + let get_items = self.supervisor.clone(); let get = warp::path::end() .and(warp::get()) .and(warp::any().map(move || get_items.clone())) .and_then(get_definitions); - let delete_items = self.items.clone(); + let delete_items = self.supervisor.clone(); let delete = warp::path!(String) .and(warp::delete()) .and(warp::any().map(move || delete_items.clone())) .and_then(delete_definition); - let export_items = self.items.clone(); + let export_items = self.supervisor.clone(); let export_def_log = self.log.clone(); let export_prometheus = warp::path!("prom" / "targets") .and(warp::get()) .and(warp::any().map(move || ExportDefinitionConfigBinding { - definitions: export_items.clone(), + supervisor: export_items.clone(), log: export_def_log.clone(), })) .and_then(export_prometheus_config); - let export_targets_items = self.items.clone(); + let export_targets_items = self.supervisor.clone(); let export_log = self.log.clone(); let export_targets = warp::path!("targets") .and(warp::get()) .and(warp::any().map(move || ExportTargetsBinding { - definitions: export_targets_items.clone(), + supervisor: export_targets_items.clone(), log: export_log.clone(), })) .and_then(export_targets); - let add_boundary_node_targets = self.items.clone(); + let add_boundary_node_targets = self.supervisor.clone(); let add_boundary_node_log = self.log.clone(); let add_boundary_node = warp::path!("add_boundary_node") .and(warp::post()) .and(warp::body::json()) .and(warp::any().map(move || AddBoundaryNodeToDefinitionBinding { - definitions: add_boundary_node_targets.clone(), + supervisor: add_boundary_node_targets.clone(), log: add_boundary_node_log.clone(), })) .and_then(add_boundary_node); diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs index 9356d7a70..81f8ca92d 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs @@ -3,16 +3,46 @@ use std::fmt::{Display, Error as FmtError, Formatter}; use slog::{error, info}; +use futures::future::join_all; use warp::Reply; -use crate::definition::{wrap, Definition}; +use crate::definition::{Definition, StartDefinitionError}; use crate::server_handlers::dto::{BadDtoError, DefinitionDto}; use crate::server_handlers::AddDefinitionBinding as ReplaceDefinitionsBinding; use crate::server_handlers::WebResult; +#[derive(Debug)] +enum ReplaceDefinitionError { + BadDtoError(BadDtoError), + StartDefinitionError(StartDefinitionError), +} + +impl Error for ReplaceDefinitionError {} + +impl Display for ReplaceDefinitionError { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + match self { + Self::BadDtoError(e) => write!(f, "{}", e), + Self::StartDefinitionError(e) => write!(f, "{}", e), + } + } +} + +impl From for ReplaceDefinitionError { + fn from(e: BadDtoError) -> Self { + Self::BadDtoError(e) + } +} + +impl From for ReplaceDefinitionError { + fn from(e: StartDefinitionError) -> Self { + Self::StartDefinitionError(e) + } +} + #[derive(Debug)] struct ReplaceDefinitionsError { - errors: Vec, + errors: Vec, } impl Error for ReplaceDefinitionsError {} @@ -30,88 +60,37 @@ async fn _replace_definitions( tentative_definitions: Vec, binding: ReplaceDefinitionsBinding, ) -> Result<(), ReplaceDefinitionsError> { - let mut existing_definitions = binding.definitions.lock().await; - - // Move all existing definitions to backed up lists. - let mut backed_up_definitions: Vec = vec![]; - for def in existing_definitions.drain(..) { - info!(binding.log, "Moving definition {} from existing to backup", def.name); - backed_up_definitions.push(def); - } - info!(binding.log, "Finished backing up existing definitions"); - - let mut new_definitions: Vec = vec![]; - - // Add all-new definitions, checking them all and saving errors - // as they happen. Do not start their threads yet. - let mut error = ReplaceDefinitionsError { errors: vec![] }; - for tentative_definition in tentative_definitions { - let dname = tentative_definition.name.clone(); - - if new_definitions.iter().any(|d| d.name == dname) { - error.errors.push(BadDtoError::AlreadyExists(dname)); - continue; - } - let new_definition = match tentative_definition - .try_into_definition( - binding.log.clone(), - binding.registry_path.clone(), - binding.poll_interval, - binding.registry_query_timeout, - ) - .await - { - Ok(def) => def, - Err(e) => { - error.errors.push(e); - continue; - } - }; - - info!(binding.log, "Adding new definition {} to existing", dname); - new_definitions.push(new_definition); + // Transform all definitions with checking. + let defsresults_futures: Vec<_> = tentative_definitions + .into_iter() + .map(|tentative_definition| async { + tentative_definition + .try_into_definition( + binding.log.clone(), + binding.registry_path.clone(), + binding.poll_interval, + binding.registry_query_timeout, + ) + .await + }) + .collect(); + let defsresults: Vec> = join_all(defsresults_futures).await; + let (new_definitions, errors): (Vec<_>, Vec<_>) = defsresults.into_iter().partition(Result::is_ok); + let new_definitions: Vec<_> = new_definitions.into_iter().map(Result::unwrap).collect(); + let errors: Vec<_> = errors.into_iter().map(Result::unwrap_err).collect(); + + if !errors.is_empty() { + return Err(ReplaceDefinitionsError { + errors: errors.into_iter().map(ReplaceDefinitionError::from).collect(), + }); } - // Was there an error? Restore the definitions and handles to their - // original structures. From the point of view of the rest of the - // program, nothing has changed here because this struct was locked. - if !error.errors.is_empty() { - for def in backed_up_definitions.drain(..) { - info!(binding.log, "Restoring backed up definition {} to existing", def.name); - existing_definitions.push(def); - } - info!(binding.log, "Finished restoring backed up definitions"); - return Err(error); - } - - // Send stop signals to all old definitions... - for old_definition in backed_up_definitions.iter() { - info!( - binding.log, - "Sending termination signal to definition {}", old_definition.name - ); - old_definition.stop_signal_sender.send(()).unwrap(); + match binding.supervisor.start(new_definitions, true).await { + Ok(()) => Ok(()), + Err(e) => Err(ReplaceDefinitionsError { + errors: e.errors.into_iter().map(ReplaceDefinitionError::from).collect(), + }), } - // ...and join their threads, emptying the handles vector... - let mut existing_handles = binding.handles.lock().await; - for old_handle in existing_handles.drain(..) { - info!(binding.log, "Waiting for thread to finish..."); - if let Err(e) = old_handle.join() { - error!( - binding.log, - "Could not join thread handle of definition being removed: {:?}", e - ); - } - } - // ...then start and record the handles for the new definitions. - for new_definition in new_definitions.iter() { - info!(binding.log, "Starting thread for definition: {:?}", new_definition.name); - let joinhandle = std::thread::spawn(wrap(new_definition.clone(), binding.rt.clone())); - existing_handles.push(joinhandle); - existing_definitions.push(new_definition.clone()); - } - - Ok(()) } pub(crate) async fn replace_definitions( From d444a9e790e361a42e15aef9ba2d21832d08e9dc Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Tue, 30 Jan 2024 15:41:55 +0100 Subject: [PATCH 2/8] Clean up unnecessary error types. --- ...add_boundary_node_to_definition_handler.rs | 85 ++++--------- .../server_handlers/add_definition_handler.rs | 67 ++-------- .../delete_definition_handler.rs | 35 +++--- .../export_prometheus_config_handler.rs | 1 + .../server_handlers/export_targets_handler.rs | 1 + .../src/server_handlers/mod.rs | 109 ++++++++++------ .../replace_definitions_handler.rs | 117 ++++-------------- 7 files changed, 154 insertions(+), 261 deletions(-) diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs index 842e4e979..f4d24014a 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs @@ -1,86 +1,53 @@ -use slog::{error, info, Logger}; +use slog::Logger; use std::error::Error; use std::fmt::{Display, Error as FmtError, Formatter}; use warp::Reply; -use crate::definition::{BoundaryNodeAlreadyExists, DefinitionsSupervisor}; +use super::{bad_request, not_found, ok, WebResult}; +use crate::definition::DefinitionsSupervisor; use crate::server_handlers::dto::BoundaryNodeDto; -use crate::server_handlers::WebResult; - -use super::dto::BadBoundaryNodeDtoError; +#[derive(Clone)] pub struct AddBoundaryNodeToDefinitionBinding { pub supervisor: DefinitionsSupervisor, pub log: Logger, } #[derive(Debug)] -pub enum AddBoundaryNodeError { - DefinitionNotFound(String), - BoundaryNodeAlreadyExists(BoundaryNodeAlreadyExists), - BadBoundaryNodeDtoError(BadBoundaryNodeDtoError), -} - -impl Error for AddBoundaryNodeError {} -impl Display for AddBoundaryNodeError { - fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { - match self { - Self::DefinitionNotFound(name) => write!(f, "definition {} not found", name), - Self::BoundaryNodeAlreadyExists(name) => write!(f, "boundary node {} already exists", name), - Self::BadBoundaryNodeDtoError(e) => write!(f, "{}", e), - } - } -} +struct DefinitionNotFound(String); -impl From for AddBoundaryNodeError { - fn from(e: BadBoundaryNodeDtoError) -> Self { - Self::BadBoundaryNodeDtoError(e) - } -} +impl Error for DefinitionNotFound {} -impl From for AddBoundaryNodeError { - fn from(e: BoundaryNodeAlreadyExists) -> Self { - Self::BoundaryNodeAlreadyExists(e) +impl Display for DefinitionNotFound { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + write!(f, "definition {} not found", self) } } -pub async fn _add_boundary_node( +pub(crate) async fn add_boundary_node( boundary_node: BoundaryNodeDto, binding: AddBoundaryNodeToDefinitionBinding, -) -> Result<(), AddBoundaryNodeError> { +) -> WebResult { + let log = binding.log.clone(); + let name = boundary_node.name.clone(); + let ic_name = boundary_node.ic_name.clone(); let mut definitions = binding.supervisor.definitions.lock().await; + let rej = format!("Definition {} could not be added", name); - let running_definition = definitions - .get_mut(&boundary_node.ic_name) - .ok_or(AddBoundaryNodeError::DefinitionNotFound(boundary_node.ic_name.clone()))?; + let running_definition = match definitions.get_mut(&ic_name) { + Some(d) => d, + None => return not_found(log, rej, DefinitionNotFound(ic_name)), + }; - let bn = boundary_node.try_into_boundary_node()?; + let bn = match boundary_node.try_into_boundary_node() { + Ok(bn) => bn, + Err(e) => return bad_request(log, rej, e), + }; - Ok(running_definition.add_boundary_node(bn).await?) -} - -pub(crate) async fn add_boundary_node( - boundary_node: BoundaryNodeDto, - binding: AddBoundaryNodeToDefinitionBinding, -) -> WebResult { - let log = binding.log.clone(); - let dname = boundary_node.ic_name.clone(); - match _add_boundary_node(boundary_node, binding).await { - Ok(_) => { - info!(log, "Added new boundary node to definition {}", dname); - Ok(warp::reply::with_status( - format!("Definition {} added successfully", dname), - warp::http::StatusCode::OK, - )) - } - Err(e) => { - error!(log, "Boundary node could not be added: {}", e); - Ok(warp::reply::with_status( - format!("Boundary node could not be added: {}", e), - warp::http::StatusCode::BAD_REQUEST, - )) - } + match running_definition.add_boundary_node(bn).await { + Ok(()) => ok(log, format!("Definition {} added successfully", name)), + Err(e) => return bad_request(log, rej, e), } } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs index b66a4b4eb..27d78c1c6 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs @@ -1,17 +1,15 @@ -use slog::{error, info, Logger}; +use slog::Logger; -use std::error::Error; use std::path::PathBuf; use std::time::Duration; -use std::fmt::{Display, Error as FmtError, Formatter}; +use super::{bad_request, ok, WebResult}; use warp::Reply; -use crate::definition::{DefinitionsSupervisor, StartDefinitionError}; -use crate::server_handlers::dto::BadDtoError; +use crate::definition::DefinitionsSupervisor; use crate::server_handlers::dto::DefinitionDto; -use crate::server_handlers::WebResult; +#[derive(Clone)] pub(crate) struct AddDefinitionBinding { pub supervisor: DefinitionsSupervisor, pub log: Logger, @@ -20,28 +18,11 @@ pub(crate) struct AddDefinitionBinding { pub registry_query_timeout: Duration, } -#[derive(Debug)] -pub enum AddDefinitionError { - StartDefinitionError(StartDefinitionError), - BadDtoError(BadDtoError), -} - -impl Error for AddDefinitionError {} - -impl Display for AddDefinitionError { - fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { - match self { - Self::StartDefinitionError(e) => write!(f, "{}", e), - Self::BadDtoError(e) => write!(f, "{}", e), - } - } -} - -async fn _add_definition( - tentative_definition: DefinitionDto, - binding: AddDefinitionBinding, -) -> Result<(), AddDefinitionError> { - let new_definition = match tentative_definition +pub(crate) async fn add_definition(definition: DefinitionDto, binding: AddDefinitionBinding) -> WebResult { + let log = binding.log.clone(); + let dname = definition.name.clone(); + let rej = format!("Definition {} could not be added", dname); + let new_definition = match definition .try_into_definition( binding.log.clone(), binding.registry_path.clone(), @@ -51,34 +32,10 @@ async fn _add_definition( .await { Ok(def) => def, - Err(e) => return Err(AddDefinitionError::BadDtoError(e)), + Err(e) => return bad_request(log, rej, e), }; - match binding.supervisor.start(vec![new_definition], false).await { - Ok(()) => Ok(()), - Err(e) => Err(AddDefinitionError::StartDefinitionError( - e.errors.into_iter().next().unwrap(), - )), - } -} - -pub(crate) async fn add_definition(definition: DefinitionDto, binding: AddDefinitionBinding) -> WebResult { - let log = binding.log.clone(); - let dname = definition.name.clone(); - match _add_definition(definition, binding).await { - Ok(_) => { - info!(log, "Added new definition {} to existing ones", dname); - Ok(warp::reply::with_status( - format!("Definition {} added successfully", dname), - warp::http::StatusCode::OK, - )) - } - Err(e) => { - error!(log, "Definition could not be added: {}", e); - Ok(warp::reply::with_status( - format!("Definition could not be added: {}", e), - warp::http::StatusCode::BAD_REQUEST, - )) - } + Ok(()) => ok(log, format!("Definition {} added successfully", dname)), + Err(e) => return bad_request(log, rej, e.errors.into_iter().next().unwrap()), } } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs index f5f19081f..1c05eb12e 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs @@ -1,23 +1,30 @@ use crate::definition::DefinitionsSupervisor; -use crate::server_handlers::WebResult; +use crate::definition::StopDefinitionError; +use slog::Logger; use warp::Reply; -pub async fn delete_definition(name: String, supervisor: DefinitionsSupervisor) -> WebResult { +use super::{bad_request, not_found, ok, WebResult}; + +#[derive(Clone)] +pub(crate) struct DeleteDefinitionBinding { + pub supervisor: DefinitionsSupervisor, + pub log: Logger, +} + +pub async fn delete_definition(name: String, binding: DeleteDefinitionBinding) -> WebResult { + let rej = format!("Definition {} could not be deleted", name); if name == "ic" { - return Ok(warp::reply::with_status( + return bad_request( + binding.log, "Cannot delete ic definition".to_string(), - warp::http::StatusCode::BAD_REQUEST, - )); + "definition is not removable", + ); } - match supervisor.stop(vec![name.clone()]).await { - Ok(_) => Ok(warp::reply::with_status( - format!("Deleted definition {}", name.clone()), - warp::http::StatusCode::OK, - )), - Err(e) => Ok(warp::reply::with_status( - format!("Could not delete definition {}: {}", name, e), - warp::http::StatusCode::BAD_REQUEST, - )), + match binding.supervisor.stop(vec![name.clone()]).await { + Ok(_) => ok(binding.log, format!("Deleted definition {}", name.clone())), + Err(e) => match e.errors.into_iter().next().unwrap() { + StopDefinitionError::DoesNotExist(e) => not_found(binding.log, rej, e), + }, } } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs index da4dcfab9..e38e6421a 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs @@ -7,6 +7,7 @@ use slog::Logger; use std::collections::BTreeMap; use warp::reply::Reply; +#[derive(Clone)] pub struct ExportDefinitionConfigBinding { pub supervisor: DefinitionsSupervisor, pub log: Logger, diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs index 25962c681..ba613e49b 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs @@ -7,6 +7,7 @@ use slog::Logger; use std::collections::BTreeMap; use warp::reply::Reply; +#[derive(Clone)] pub struct ExportTargetsBinding { pub supervisor: DefinitionsSupervisor, pub log: Logger, diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs index b16d73c3a..84fa10855 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs @@ -1,7 +1,9 @@ +use std::fmt::Display; use std::path::PathBuf; use std::time::Duration; use slog::{info, Logger}; +use warp::reply::WithStatus; use warp::{Filter, Rejection}; use crate::definition::DefinitionsSupervisor; @@ -10,7 +12,7 @@ use crate::server_handlers::add_boundary_node_to_definition_handler::AddBoundary use crate::server_handlers::add_definition_handler::{ add_definition, AddDefinitionBinding, AddDefinitionBinding as ReplaceDefinitionsBinding, }; -use crate::server_handlers::delete_definition_handler::delete_definition; +use crate::server_handlers::delete_definition_handler::{delete_definition, DeleteDefinitionBinding}; use crate::server_handlers::export_prometheus_config_handler::{ export_prometheus_config, ExportDefinitionConfigBinding, }; @@ -30,6 +32,35 @@ mod replace_definitions_handler; pub type WebResult = Result; +pub(crate) fn ok(log: Logger, message: String) -> WebResult> { + info!(log, "{}", message); + let r: WithStatus = warp::reply::with_status(message, warp::http::StatusCode::OK); + let rr: WebResult> = Ok(r); + rr +} + +pub(crate) fn bad_request(log: Logger, message: String, err: T) -> WebResult> +where + T: Display, +{ + info!(log, "{}: {}", message, err); + Ok(warp::reply::with_status( + format!("{}: {}", message, err), + warp::http::StatusCode::BAD_REQUEST, + )) +} + +pub(crate) fn not_found(log: Logger, message: String, err: T) -> WebResult> +where + T: Display, +{ + info!(log, "{}: {}", message, err); + Ok(warp::reply::with_status( + format!("{}: {}", message, err), + warp::http::StatusCode::NOT_FOUND, + )) +} + pub(crate) struct Server { log: Logger, supervisor: DefinitionsSupervisor, @@ -58,34 +89,30 @@ impl Server { let poll_interval = self.poll_interval; let registry_query_timeout = self.registry_query_timeout; - let add_items = self.supervisor.clone(); - let add_log = self.log.clone(); - let add_registry_path = self.registry_path.clone(); + let binding = AddDefinitionBinding { + supervisor: self.supervisor.clone(), + log: self.log.clone(), + poll_interval, + registry_query_timeout, + registry_path: self.registry_path.clone(), + }; let add = warp::path::end() .and(warp::post()) .and(warp::body::json()) - .and(warp::any().map(move || AddDefinitionBinding { - supervisor: add_items.clone(), - log: add_log.clone(), - poll_interval, - registry_query_timeout, - registry_path: add_registry_path.clone(), - })) + .and(warp::any().map(move || binding.clone())) .and_then(add_definition); - let put_items = self.supervisor.clone(); - let put_log = self.log.clone(); - let put_registry_path = self.registry_path.clone(); + let binding = ReplaceDefinitionsBinding { + supervisor: self.supervisor.clone(), + log: self.log.clone(), + poll_interval, + registry_query_timeout, + registry_path: self.registry_path.clone(), + }; let put = warp::path::end() .and(warp::put()) .and(warp::body::json()) - .and(warp::any().map(move || ReplaceDefinitionsBinding { - supervisor: put_items.clone(), - log: put_log.clone(), - poll_interval, - registry_query_timeout, - registry_path: put_registry_path.clone(), - })) + .and(warp::any().map(move || binding.clone())) .and_then(replace_definitions); let get_items = self.supervisor.clone(); @@ -94,41 +121,41 @@ impl Server { .and(warp::any().map(move || get_items.clone())) .and_then(get_definitions); - let delete_items = self.supervisor.clone(); + let binding = DeleteDefinitionBinding { + supervisor: self.supervisor.clone(), + log: self.log.clone(), + }; let delete = warp::path!(String) .and(warp::delete()) - .and(warp::any().map(move || delete_items.clone())) + .and(warp::any().map(move || binding.clone())) .and_then(delete_definition); - let export_items = self.supervisor.clone(); - let export_def_log = self.log.clone(); + let binding = ExportDefinitionConfigBinding { + supervisor: self.supervisor.clone(), + log: self.log.clone(), + }; let export_prometheus = warp::path!("prom" / "targets") .and(warp::get()) - .and(warp::any().map(move || ExportDefinitionConfigBinding { - supervisor: export_items.clone(), - log: export_def_log.clone(), - })) + .and(warp::any().map(move || binding.clone())) .and_then(export_prometheus_config); - let export_targets_items = self.supervisor.clone(); - let export_log = self.log.clone(); + let binding = ExportTargetsBinding { + supervisor: self.supervisor.clone(), + log: self.log.clone(), + }; let export_targets = warp::path!("targets") .and(warp::get()) - .and(warp::any().map(move || ExportTargetsBinding { - supervisor: export_targets_items.clone(), - log: export_log.clone(), - })) + .and(warp::any().map(move || binding.clone())) .and_then(export_targets); - let add_boundary_node_targets = self.supervisor.clone(); - let add_boundary_node_log = self.log.clone(); + let binding = AddBoundaryNodeToDefinitionBinding { + supervisor: self.supervisor.clone(), + log: self.log.clone(), + }; let add_boundary_node = warp::path!("add_boundary_node") .and(warp::post()) .and(warp::body::json()) - .and(warp::any().map(move || AddBoundaryNodeToDefinitionBinding { - supervisor: add_boundary_node_targets.clone(), - log: add_boundary_node_log.clone(), - })) + .and(warp::any().map(move || binding.clone())) .and_then(add_boundary_node); let routes = add diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs index 81f8ca92d..00a0c0e52 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs @@ -1,67 +1,25 @@ -use std::error::Error; -use std::fmt::{Display, Error as FmtError, Formatter}; - -use slog::{error, info}; - use futures::future::join_all; use warp::Reply; -use crate::definition::{Definition, StartDefinitionError}; +use super::{bad_request, ok, WebResult}; + +use crate::definition::Definition; use crate::server_handlers::dto::{BadDtoError, DefinitionDto}; use crate::server_handlers::AddDefinitionBinding as ReplaceDefinitionsBinding; -use crate::server_handlers::WebResult; - -#[derive(Debug)] -enum ReplaceDefinitionError { - BadDtoError(BadDtoError), - StartDefinitionError(StartDefinitionError), -} - -impl Error for ReplaceDefinitionError {} - -impl Display for ReplaceDefinitionError { - fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { - match self { - Self::BadDtoError(e) => write!(f, "{}", e), - Self::StartDefinitionError(e) => write!(f, "{}", e), - } - } -} - -impl From for ReplaceDefinitionError { - fn from(e: BadDtoError) -> Self { - Self::BadDtoError(e) - } -} - -impl From for ReplaceDefinitionError { - fn from(e: StartDefinitionError) -> Self { - Self::StartDefinitionError(e) - } -} - -#[derive(Debug)] -struct ReplaceDefinitionsError { - errors: Vec, -} -impl Error for ReplaceDefinitionsError {} - -impl Display for ReplaceDefinitionsError { - fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { - for e in self.errors.iter() { - write!(f, "* {}", e)? - } - Ok(()) - } -} - -async fn _replace_definitions( - tentative_definitions: Vec, +pub(crate) async fn replace_definitions( + definitions: Vec, binding: ReplaceDefinitionsBinding, -) -> Result<(), ReplaceDefinitionsError> { - // Transform all definitions with checking. - let defsresults_futures: Vec<_> = tentative_definitions +) -> WebResult { + let log = binding.log.clone(); + let rej = "Definitions could not be changed".to_string(); + let dnames = definitions + .iter() + .map(|d| d.name.clone()) + .collect::>() + .join(", "); + + let defsresults_futures: Vec<_> = definitions .into_iter() .map(|tentative_definition| async { tentative_definition @@ -80,43 +38,18 @@ async fn _replace_definitions( let errors: Vec<_> = errors.into_iter().map(Result::unwrap_err).collect(); if !errors.is_empty() { - return Err(ReplaceDefinitionsError { - errors: errors.into_iter().map(ReplaceDefinitionError::from).collect(), - }); + return bad_request( + log, + rej, + format!( + ":\n * {}", + errors.iter().map(|e| e.to_string()).collect::>().join("\n * ") + ), + ); } match binding.supervisor.start(new_definitions, true).await { - Ok(()) => Ok(()), - Err(e) => Err(ReplaceDefinitionsError { - errors: e.errors.into_iter().map(ReplaceDefinitionError::from).collect(), - }), - } -} - -pub(crate) async fn replace_definitions( - definitions: Vec, - binding: ReplaceDefinitionsBinding, -) -> WebResult { - let log = binding.log.clone(); - let dnames = definitions - .iter() - .map(|d| d.name.clone()) - .collect::>() - .join(", "); - match _replace_definitions(definitions, binding).await { - Ok(_) => { - info!(log, "Added new definitions {} to existing ones", dnames); - Ok(warp::reply::with_status( - format!("Definitions {} added successfully", dnames), - warp::http::StatusCode::OK, - )) - } - Err(e) => { - error!(log, "Definitions could not be added:\n{}", e); - Ok(warp::reply::with_status( - format!("Definitions could not be replaced:\n{}", e), - warp::http::StatusCode::BAD_REQUEST, - )) - } + Ok(_) => ok(log, format!("Added new definitions {} to existing ones", dnames)), + Err(e) => bad_request(log, rej, format!(":\n{}", e)), } } From d60c77c9e2aa2487827267990a573b4a53c5a4d6 Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Tue, 30 Jan 2024 15:47:55 +0100 Subject: [PATCH 3/8] Clippy. --- .../add_boundary_node_to_definition_handler.rs | 13 ++++++++----- .../src/server_handlers/add_definition_handler.rs | 2 +- .../server_handlers/replace_definitions_handler.rs | 4 ++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs index f4d24014a..50bd3ce97 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs @@ -16,13 +16,15 @@ pub struct AddBoundaryNodeToDefinitionBinding { #[derive(Debug)] -struct DefinitionNotFound(String); +struct DefinitionNotFound { + ic_name: String, +} impl Error for DefinitionNotFound {} impl Display for DefinitionNotFound { fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { - write!(f, "definition {} not found", self) + write!(f, "definition {} not found", self.ic_name) } } @@ -33,12 +35,13 @@ pub(crate) async fn add_boundary_node( let log = binding.log.clone(); let name = boundary_node.name.clone(); let ic_name = boundary_node.ic_name.clone(); + let rej: String = format!("Definition {} could not be added", name); + let mut definitions = binding.supervisor.definitions.lock().await; - let rej = format!("Definition {} could not be added", name); let running_definition = match definitions.get_mut(&ic_name) { Some(d) => d, - None => return not_found(log, rej, DefinitionNotFound(ic_name)), + None => return not_found(log, rej, DefinitionNotFound { ic_name }), }; let bn = match boundary_node.try_into_boundary_node() { @@ -48,6 +51,6 @@ pub(crate) async fn add_boundary_node( match running_definition.add_boundary_node(bn).await { Ok(()) => ok(log, format!("Definition {} added successfully", name)), - Err(e) => return bad_request(log, rej, e), + Err(e) => bad_request(log, rej, e), } } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs index 27d78c1c6..0992bd22a 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs @@ -36,6 +36,6 @@ pub(crate) async fn add_definition(definition: DefinitionDto, binding: AddDefini }; match binding.supervisor.start(vec![new_definition], false).await { Ok(()) => ok(log, format!("Definition {} added successfully", dname)), - Err(e) => return bad_request(log, rej, e.errors.into_iter().next().unwrap()), + Err(e) => bad_request(log, rej, e.errors.into_iter().next().unwrap()), } } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs index 00a0c0e52..3ae786147 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs @@ -34,9 +34,8 @@ pub(crate) async fn replace_definitions( .collect(); let defsresults: Vec> = join_all(defsresults_futures).await; let (new_definitions, errors): (Vec<_>, Vec<_>) = defsresults.into_iter().partition(Result::is_ok); - let new_definitions: Vec<_> = new_definitions.into_iter().map(Result::unwrap).collect(); - let errors: Vec<_> = errors.into_iter().map(Result::unwrap_err).collect(); + let errors: Vec<_> = errors.into_iter().map(Result::unwrap_err).collect(); if !errors.is_empty() { return bad_request( log, @@ -48,6 +47,7 @@ pub(crate) async fn replace_definitions( ); } + let new_definitions: Vec<_> = new_definitions.into_iter().map(Result::unwrap).collect(); match binding.supervisor.start(new_definitions, true).await { Ok(_) => ok(log, format!("Added new definitions {} to existing ones", dnames)), Err(e) => bad_request(log, rej, format!(":\n{}", e)), From 5c4126dbad539077ff5d8c38cdf2a29ffe2ab00b Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Tue, 30 Jan 2024 17:30:52 +0100 Subject: [PATCH 4/8] Handle mercury deletion consistently. --- Cargo.lock | 1 + .../multiservice-discovery/Cargo.toml | 4 +- .../multiservice-discovery/src/definition.rs | 104 +++++++++++++----- .../multiservice-discovery/src/main.rs | 12 +- .../server_handlers/add_definition_handler.rs | 8 +- .../delete_definition_handler.rs | 11 +- .../src/server_handlers/mod.rs | 10 ++ .../replace_definitions_handler.rs | 8 +- 8 files changed, 109 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e30a9b22..58aec2aab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5713,6 +5713,7 @@ dependencies = [ "humantime", "ic-async-utils", "ic-crypto-utils-threshold-sig-der", + "ic-management-types", "ic-registry-client", "ic-types", "ic-utils 0.9.0", diff --git a/rs/ic-observability/multiservice-discovery/Cargo.toml b/rs/ic-observability/multiservice-discovery/Cargo.toml index 09d25239d..474a022ef 100644 --- a/rs/ic-observability/multiservice-discovery/Cargo.toml +++ b/rs/ic-observability/multiservice-discovery/Cargo.toml @@ -11,12 +11,12 @@ clap = { workspace = true } crossbeam = { workspace = true } crossbeam-channel = { workspace = true } erased-serde = { workspace = true } -futures.workspace = true futures-util = { workspace = true } humantime = { workspace = true } ic-async-utils = { workspace = true } ic-crypto-utils-threshold-sig-der = { workspace = true } ic-registry-client = { workspace = true } +ic-management-types = { workspace = true } ic-types = { workspace = true } ic-utils = { workspace = true } multiservice-discovery-shared = { path = "../multiservice-discovery-shared" } @@ -30,10 +30,10 @@ slog-term = { workspace = true } tokio = { workspace = true } url = { workspace = true } warp = { workspace = true } +futures.workspace = true [dev-dependencies] tempdir = "0.3.7" reqwest = "0.11.23" assert_cmd = "2.0.13" anyhow = "1.0.79" - diff --git a/rs/ic-observability/multiservice-discovery/src/definition.rs b/rs/ic-observability/multiservice-discovery/src/definition.rs index c5110376b..78499f537 100644 --- a/rs/ic-observability/multiservice-discovery/src/definition.rs +++ b/rs/ic-observability/multiservice-discovery/src/definition.rs @@ -1,5 +1,7 @@ use crossbeam_channel::Receiver; use crossbeam_channel::Sender; +use futures_util::future::join_all; +use ic_management_types::Network; use ic_registry_client::client::ThresholdSigPublicKey; use service_discovery::job_types::JobType; use service_discovery::IcServiceDiscovery; @@ -225,7 +227,7 @@ impl RunningDefinition { self.poll_loop().await; - if self.definition.name != "mercury" { + if self.definition.name != Network::Mainnet.legacy_name() { info!( self.definition.log, "Removing registry dir '{}' for definition {}...", @@ -273,6 +275,7 @@ pub struct BoundaryNode { #[derive(Debug)] pub(crate) enum StartDefinitionError { AlreadyExists(String), + DeletionDisallowed(String), } impl Error for StartDefinitionError {} @@ -281,6 +284,7 @@ impl Display for StartDefinitionError { fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { match self { Self::AlreadyExists(name) => write!(f, "definition {} is already running", name), + Self::DeletionDisallowed(name) => write!(f, "definition {} may not be deleted without a replacement", name), } } } @@ -303,6 +307,7 @@ impl Display for StartDefinitionsError { #[derive(Debug)] pub(crate) enum StopDefinitionError { DoesNotExist(String), + DeletionDisallowed(String), } impl Error for StopDefinitionError {} @@ -311,6 +316,7 @@ impl Display for StopDefinitionError { fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { match self { Self::DoesNotExist(name) => write!(f, "definition {} does not exist", name), + Self::DeletionDisallowed(name) => write!(f, "definition {} may not be deleted", name), } } } @@ -330,17 +336,25 @@ impl Display for StopDefinitionsError { } } +#[derive(PartialEq)] +pub(crate) enum StartMode { + AddToDefinitions, + ReplaceExistingDefinitions, +} + #[derive(Clone)] pub(crate) struct DefinitionsSupervisor { rt: tokio::runtime::Handle, pub(crate) definitions: Arc>>, + allow_mercury_deletion: bool, } impl DefinitionsSupervisor { - pub(crate) fn new(rt: tokio::runtime::Handle) -> Self { + pub(crate) fn new(rt: tokio::runtime::Handle, allow_mercury_deletion: bool) -> Self { DefinitionsSupervisor { rt, definitions: Arc::new(Mutex::new(BTreeMap::new())), + allow_mercury_deletion, } } @@ -348,41 +362,68 @@ impl DefinitionsSupervisor { &self, existing: &mut BTreeMap, definitions: Vec, - replace_existing: bool, + start_mode: StartMode, ) -> Result<(), StartDefinitionsError> { let mut error = StartDefinitionsError { errors: vec![] }; - let mut names_added: HashSet = HashSet::new(); + let mut ic_names_to_add: HashSet = HashSet::new(); for definition in definitions.iter() { - let dname = definition.name.clone(); + let ic_name = definition.name.clone(); // Check if we already have something running with the same name, // if the user does not want to replace those with newer defs. - if !replace_existing && existing.contains_key(&dname) { - error.errors.push(StartDefinitionError::AlreadyExists(dname.clone())); + if start_mode == StartMode::AddToDefinitions && existing.contains_key(&ic_name) { + error.errors.push(StartDefinitionError::AlreadyExists(ic_name.clone())); continue; } // Check for incoming duplicates. - if names_added.contains(&dname) { - error.errors.push(StartDefinitionError::AlreadyExists(dname.clone())); + if ic_names_to_add.contains(&ic_name) { + error.errors.push(StartDefinitionError::AlreadyExists(ic_name.clone())); continue; } - names_added.insert(dname); + ic_names_to_add.insert(ic_name); + } + + if !self.allow_mercury_deletion && !ic_names_to_add.contains(&Network::Mainnet.legacy_name()) { + error + .errors + .push(StartDefinitionError::DeletionDisallowed(Network::Mainnet.legacy_name())) } if !error.errors.is_empty() { return Err(error); } + // We stop X before we start X' because otherwise + // the newly-running definition will fight over + // shared disk space (a folder) and probably die. + let ic_names_to_end: Vec = existing + .clone() + .into_keys() + .filter(|ic_name| match start_mode { + // In this mode, we only remove existing definitions if they are going to be replaced. + StartMode::AddToDefinitions => ic_names_to_add.contains(ic_name), + // In this mode, we remove all definitions. + StartMode::ReplaceExistingDefinitions => true, + }) + .collect(); + // Get definitions to end. + let mut defs_to_end = ic_names_to_end + .iter() + .map(|ic_name| existing.remove(&ic_name.clone()).unwrap()) + .collect::>(); + // End them and join them all. + join_all(defs_to_end.iter_mut().map(|def| async { def.end().await })).await; + // Remove from running definitions list. + ic_names_to_end + .iter() + .map(|ic_name| existing.remove(&ic_name.clone())) + .for_each(drop); + drop(defs_to_end); + drop(ic_names_to_end); + + // Now we add the incoming definitions. for definition in definitions.into_iter() { - // Here is where we stop already running definitions - // that have a name similar to the one being added. - if let Some(d) = existing.get_mut(&definition.name) { - d.end().await - } - // We stop X before we start X' because otherwise - // the newly-running definition will fight over - // shared disk space (a folder) and probably die. existing.insert(definition.name.clone(), definition.run(self.rt.clone()).await); } Ok(()) @@ -397,31 +438,34 @@ impl DefinitionsSupervisor { pub(crate) async fn start( &self, definitions: Vec, - replace_existing: bool, + start_mode: StartMode, ) -> Result<(), StartDefinitionsError> { - let mut u = self.definitions.lock().await; - self.start_inner(&mut u, definitions, replace_existing).await + let mut existing = self.definitions.lock().await; + self.start_inner(&mut existing, definitions, start_mode).await } - async fn end_inner(&self, definitions: &mut BTreeMap) { - for (_, definition) in definitions.iter_mut() { + /// Stop all definitions and end. + pub(crate) async fn end(&self) { + let mut existing = self.definitions.lock().await; + for (_, definition) in existing.iter_mut() { definition.end().await } - definitions.clear() - } - - pub(crate) async fn end(&self) { - let mut u = self.definitions.lock().await; - self.end_inner(&mut u).await + existing.clear() } pub(crate) async fn stop(&self, definition_names: Vec) -> Result<(), StopDefinitionsError> { let mut defs = self.definitions.lock().await; - let errors: Vec = definition_names + let mut errors: Vec = definition_names .iter() .filter(|n| defs.contains_key(*n)) .map(|n| StopDefinitionError::DoesNotExist(n.clone())) .collect(); + errors.extend( + definition_names + .iter() + .filter(|n| **n == Network::Mainnet.legacy_name() && !self.allow_mercury_deletion) + .map(|n| StopDefinitionError::DeletionDisallowed(n.clone())), + ); if !errors.is_empty() { return Err(StopDefinitionsError { errors }); } diff --git a/rs/ic-observability/multiservice-discovery/src/main.rs b/rs/ic-observability/multiservice-discovery/src/main.rs index 69cdb79f5..26bea9050 100644 --- a/rs/ic-observability/multiservice-discovery/src/main.rs +++ b/rs/ic-observability/multiservice-discovery/src/main.rs @@ -9,8 +9,9 @@ use tokio::runtime::Runtime; use tokio::sync::oneshot::{self}; use url::Url; -use definition::{Definition, DefinitionsSupervisor}; +use definition::{Definition, DefinitionsSupervisor, StartMode}; use ic_async_utils::shutdown_signal; +use ic_management_types::Network; use crate::server_handlers::Server; @@ -23,7 +24,7 @@ fn main() { let shutdown_signal = shutdown_signal(log.clone()).shared(); let cli_args = CliArgs::parse(); - let supervisor = DefinitionsSupervisor::new(rt.handle().clone()); + let supervisor = DefinitionsSupervisor::new(rt.handle().clone(), cli_args.start_without_mainnet); let (server_stop, server_stop_receiver) = oneshot::channel(); //Configure server @@ -40,7 +41,10 @@ fn main() { if !cli_args.start_without_mainnet { rt.block_on(async { let _ = supervisor - .start(vec![get_mainnet_definition(&cli_args, log.clone())], false) + .start( + vec![get_mainnet_definition(&cli_args, log.clone())], + StartMode::AddToDefinitions, + ) .await; }); } @@ -127,7 +131,7 @@ fn get_mainnet_definition(cli_args: &CliArgs, log: Logger) -> Definition { Definition::new( vec![cli_args.nns_url.clone()], cli_args.targets_dir.clone(), - "mercury".to_string(), + Network::Mainnet.legacy_name(), log.clone(), None, cli_args.poll_interval, diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs index 0992bd22a..bd3510029 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs @@ -6,7 +6,7 @@ use std::time::Duration; use super::{bad_request, ok, WebResult}; use warp::Reply; -use crate::definition::DefinitionsSupervisor; +use crate::definition::{DefinitionsSupervisor, StartMode}; use crate::server_handlers::dto::DefinitionDto; #[derive(Clone)] @@ -34,7 +34,11 @@ pub(crate) async fn add_definition(definition: DefinitionDto, binding: AddDefini Ok(def) => def, Err(e) => return bad_request(log, rej, e), }; - match binding.supervisor.start(vec![new_definition], false).await { + match binding + .supervisor + .start(vec![new_definition], StartMode::AddToDefinitions) + .await + { Ok(()) => ok(log, format!("Definition {} added successfully", dname)), Err(e) => bad_request(log, rej, e.errors.into_iter().next().unwrap()), } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs index 1c05eb12e..3d8f96cec 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs @@ -3,7 +3,7 @@ use crate::definition::StopDefinitionError; use slog::Logger; use warp::Reply; -use super::{bad_request, not_found, ok, WebResult}; +use super::{forbidden, not_found, ok, WebResult}; #[derive(Clone)] pub(crate) struct DeleteDefinitionBinding { @@ -13,18 +13,11 @@ pub(crate) struct DeleteDefinitionBinding { pub async fn delete_definition(name: String, binding: DeleteDefinitionBinding) -> WebResult { let rej = format!("Definition {} could not be deleted", name); - if name == "ic" { - return bad_request( - binding.log, - "Cannot delete ic definition".to_string(), - "definition is not removable", - ); - } - match binding.supervisor.stop(vec![name.clone()]).await { Ok(_) => ok(binding.log, format!("Deleted definition {}", name.clone())), Err(e) => match e.errors.into_iter().next().unwrap() { StopDefinitionError::DoesNotExist(e) => not_found(binding.log, rej, e), + StopDefinitionError::DeletionDisallowed(e) => forbidden(binding.log, rej, e), }, } } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs index 84fa10855..c9fd0a2ad 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs @@ -60,6 +60,16 @@ where warp::http::StatusCode::NOT_FOUND, )) } +pub(crate) fn forbidden(log: Logger, message: String, err: T) -> WebResult> +where + T: Display, +{ + info!(log, "{}: {}", message, err); + Ok(warp::reply::with_status( + format!("{}: {}", message, err), + warp::http::StatusCode::FORBIDDEN, + )) +} pub(crate) struct Server { log: Logger, diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs index 3ae786147..01094ea8d 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs @@ -3,7 +3,7 @@ use warp::Reply; use super::{bad_request, ok, WebResult}; -use crate::definition::Definition; +use crate::definition::{Definition, StartMode}; use crate::server_handlers::dto::{BadDtoError, DefinitionDto}; use crate::server_handlers::AddDefinitionBinding as ReplaceDefinitionsBinding; @@ -48,7 +48,11 @@ pub(crate) async fn replace_definitions( } let new_definitions: Vec<_> = new_definitions.into_iter().map(Result::unwrap).collect(); - match binding.supervisor.start(new_definitions, true).await { + match binding + .supervisor + .start(new_definitions, StartMode::ReplaceExistingDefinitions) + .await + { Ok(_) => ok(log, format!("Added new definitions {} to existing ones", dnames)), Err(e) => bad_request(log, rej, format!(":\n{}", e)), } From ba813a20df493e6ca6dfa513a1fdcb9b65c876cb Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Tue, 30 Jan 2024 18:09:21 +0100 Subject: [PATCH 5/8] Improve responsiveness of cancellation of initial sync of definitions. --- .../multiservice-discovery/src/definition.rs | 35 ++++++++++++------- .../delete_definition_handler.rs | 8 +++-- .../service-discovery/src/registry_sync.rs | 29 ++++++++++++--- 3 files changed, 54 insertions(+), 18 deletions(-) diff --git a/rs/ic-observability/multiservice-discovery/src/definition.rs b/rs/ic-observability/multiservice-discovery/src/definition.rs index 78499f537..8d94533b2 100644 --- a/rs/ic-observability/multiservice-discovery/src/definition.rs +++ b/rs/ic-observability/multiservice-discovery/src/definition.rs @@ -4,6 +4,7 @@ use futures_util::future::join_all; use ic_management_types::Network; use ic_registry_client::client::ThresholdSigPublicKey; use service_discovery::job_types::JobType; +use service_discovery::registry_sync::Interrupted; use service_discovery::IcServiceDiscovery; use service_discovery::IcServiceDiscoveryError; use service_discovery::TargetGroup; @@ -159,7 +160,7 @@ impl RunningDefinition { .get_target_groups(job_type, self.definition.log.clone()) } - async fn initial_registry_sync(&self) { + async fn initial_registry_sync(&self) -> Result<(), Interrupted> { info!( self.definition.log, "Syncing local registry for {} started", self.definition.name @@ -170,19 +171,26 @@ impl RunningDefinition { self.definition.registry_path.display() ); - sync_local_registry( + let r = sync_local_registry( self.definition.log.clone(), self.definition.registry_path.join("targets"), self.definition.nns_urls.clone(), false, self.definition.public_key, + &self.stop_signal, ) .await; - - info!( - self.definition.log, - "Syncing local registry for {} completed", self.definition.name - ); + match r { + Ok(_) => info!( + self.definition.log, + "Syncing local registry for {} completed", self.definition.name, + ), + Err(_) => warn!( + self.definition.log, + "Interrupted initial sync of definition {}", self.definition.name + ), + } + r } async fn poll_loop(&self) { @@ -218,7 +226,9 @@ impl RunningDefinition { } async fn run(&self) { - self.initial_registry_sync().await; + if let Err(_) = self.initial_registry_sync().await { + return; + }; info!( self.definition.log, @@ -284,7 +294,7 @@ impl Display for StartDefinitionError { fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { match self { Self::AlreadyExists(name) => write!(f, "definition {} is already running", name), - Self::DeletionDisallowed(name) => write!(f, "definition {} may not be deleted without a replacement", name), + Self::DeletionDisallowed(name) => write!(f, "deletion of {} is disallowed without a replacement", name), } } } @@ -316,7 +326,7 @@ impl Display for StopDefinitionError { fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { match self { Self::DoesNotExist(name) => write!(f, "definition {} does not exist", name), - Self::DeletionDisallowed(name) => write!(f, "definition {} may not be deleted", name), + Self::DeletionDisallowed(name) => write!(f, "deletion of {} is disallowed by configuration", name), } } } @@ -456,8 +466,9 @@ impl DefinitionsSupervisor { pub(crate) async fn stop(&self, definition_names: Vec) -> Result<(), StopDefinitionsError> { let mut defs = self.definitions.lock().await; let mut errors: Vec = definition_names - .iter() - .filter(|n| defs.contains_key(*n)) + .clone() + .into_iter() + .filter(|n| !defs.contains_key(n)) .map(|n| StopDefinitionError::DoesNotExist(n.clone())) .collect(); errors.extend( diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs index 3d8f96cec..de7d28185 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs @@ -16,8 +16,12 @@ pub async fn delete_definition(name: String, binding: DeleteDefinitionBinding) - match binding.supervisor.stop(vec![name.clone()]).await { Ok(_) => ok(binding.log, format!("Deleted definition {}", name.clone())), Err(e) => match e.errors.into_iter().next().unwrap() { - StopDefinitionError::DoesNotExist(e) => not_found(binding.log, rej, e), - StopDefinitionError::DeletionDisallowed(e) => forbidden(binding.log, rej, e), + StopDefinitionError::DoesNotExist(e) => { + not_found(binding.log, "FUCK".to_string(), StopDefinitionError::DoesNotExist(e)) + } + StopDefinitionError::DeletionDisallowed(e) => { + forbidden(binding.log, rej, StopDefinitionError::DeletionDisallowed(e)) + } }, } } diff --git a/rs/ic-observability/service-discovery/src/registry_sync.rs b/rs/ic-observability/service-discovery/src/registry_sync.rs index b23addb02..5967eb809 100644 --- a/rs/ic-observability/service-discovery/src/registry_sync.rs +++ b/rs/ic-observability/service-discovery/src/registry_sync.rs @@ -1,11 +1,13 @@ use futures::TryFutureExt; use std::{ + error::Error, ops::Add, path::{Path, PathBuf}, sync::Arc, time::Instant, }; +use crossbeam_channel::Receiver; use ic_interfaces_registry::{RegistryClient, RegistryValue, ZERO_REGISTRY_VERSION}; use ic_protobuf::registry::crypto::v1::PublicKey; use ic_registry_client::client::ThresholdSigPublicKey; @@ -19,15 +21,28 @@ use ic_registry_nns_data_provider::registry::RegistryCanister; use ic_types::{PrincipalId, RegistryVersion, SubnetId}; use registry_canister::mutations::common::decode_registry_value; use slog::{debug, error, info, warn, Logger}; +use std::fmt::{Display, Error as FmtError, Formatter}; use url::Url; +#[derive(Debug)] +pub struct Interrupted {} + +impl Error for Interrupted {} + +impl Display for Interrupted { + fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> { + write!(f, "interrupted") + } +} + pub async fn sync_local_registry( log: Logger, local_path: PathBuf, nns_urls: Vec, use_current_version: bool, public_key: Option, -) { + stop_signal: &Receiver<()>, +) -> Result<(), Interrupted> { let start = Instant::now(); let local_store = Arc::new(LocalStoreImpl::new(local_path.clone())); let registry_canister = RegistryCanister::new(nns_urls); @@ -43,7 +58,7 @@ pub async fn sync_local_registry( if use_current_version && latest_version != ZERO_REGISTRY_VERSION { debug!(log, "Skipping syncing with registry, using local version"); - return; + return Ok(()); } else if use_current_version { warn!( log, @@ -63,13 +78,18 @@ pub async fn sync_local_registry( if let Err(e) = maybe_key { let network_name = local_path.file_name().unwrap().to_str().unwrap(); debug!(log, "Unable to fetch public key for network {}: {:?}", network_name, e); - return; + return Ok(()); // FIXME: what?!; } maybe_key.unwrap() } }; loop { + match stop_signal.try_recv() { + Ok(_) => return Err(Interrupted {}), + Err(_) => {} + } + if match registry_canister.get_latest_version().await { Ok(v) => { debug!(log, "Latest registry version: {}", v); @@ -155,7 +175,8 @@ pub async fn sync_local_registry( futures::future::join_all(updates).await; local_store.update_certified_time(latest_certified_time).unwrap(); - info!(log, "Synced all registry versions in : {:?}", start.elapsed()) + info!(log, "Synced all registry versions in : {:?}", start.elapsed()); + Ok(()) } async fn get_nns_public_key(registry_canister: &RegistryCanister) -> anyhow::Result { From 6a7fe0f6f2c5b7d10682a287badaf0f26f4e957c Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Tue, 30 Jan 2024 18:15:56 +0100 Subject: [PATCH 6/8] Clippy. --- .../multiservice-discovery/src/definition.rs | 2 +- .../node-status-updater/src/main.rs | 26 +++++++++++------- .../prometheus-config-updater/src/main.rs | 27 ++++++++++++------- .../service-discovery/src/registry_sync.rs | 6 ++--- 4 files changed, 38 insertions(+), 23 deletions(-) diff --git a/rs/ic-observability/multiservice-discovery/src/definition.rs b/rs/ic-observability/multiservice-discovery/src/definition.rs index 8d94533b2..c2c0cdd90 100644 --- a/rs/ic-observability/multiservice-discovery/src/definition.rs +++ b/rs/ic-observability/multiservice-discovery/src/definition.rs @@ -226,7 +226,7 @@ impl RunningDefinition { } async fn run(&self) { - if let Err(_) = self.initial_registry_sync().await { + if self.initial_registry_sync().await.is_err() { return; }; diff --git a/rs/ic-observability/node-status-updater/src/main.rs b/rs/ic-observability/node-status-updater/src/main.rs index f694927ea..6607ea310 100644 --- a/rs/ic-observability/node-status-updater/src/main.rs +++ b/rs/ic-observability/node-status-updater/src/main.rs @@ -10,7 +10,7 @@ use prometheus_http_query::Client; use service_discovery::{ metrics::Metrics, poll_loop::make_poll_loop, registry_sync::sync_local_registry, IcServiceDiscoveryImpl, }; -use slog::{info, o, Drain}; +use slog::{info, o, warn, Drain}; use std::{path::PathBuf, sync::Arc, time::Duration}; use url::Url; @@ -33,13 +33,22 @@ fn main() -> Result<()> { info!(log, "Starting service discovery ..."); let mercury_target_dir = cli_args.targets_dir.join("mercury"); let nns_url = vec![cli_args.nns_url.clone()]; - rt.block_on(sync_local_registry( - log.clone(), - mercury_target_dir, - nns_url, - cli_args.skip_sync, - None, - )); + let (stop_signal_sender, stop_signal_rcv) = crossbeam::channel::bounded::<()>(0); + + if rt + .block_on(sync_local_registry( + log.clone(), + mercury_target_dir, + nns_url, + cli_args.skip_sync, + None, + &stop_signal_rcv, + )) + .is_err() + { + warn!(log, "Interrupted IcServiceDiscovery ..."); + return Ok(()); + } info!(log, "Starting IcServiceDiscovery ..."); let ic_discovery = Arc::new(IcServiceDiscoveryImpl::new( @@ -48,7 +57,6 @@ fn main() -> Result<()> { cli_args.registry_query_timeout, )?); - let (stop_signal_sender, stop_signal_rcv) = crossbeam::channel::bounded::<()>(0); let (update_signal_sender, update_signal_rcv) = crossbeam::channel::bounded::<()>(0); let poll_loop = make_poll_loop( log.clone(), diff --git a/rs/ic-observability/prometheus-config-updater/src/main.rs b/rs/ic-observability/prometheus-config-updater/src/main.rs index 821ab0a44..950562f95 100644 --- a/rs/ic-observability/prometheus-config-updater/src/main.rs +++ b/rs/ic-observability/prometheus-config-updater/src/main.rs @@ -17,7 +17,7 @@ use regex::Regex; use service_discovery::job_types::{JobType, NodeOS}; use service_discovery::registry_sync::sync_local_registry; use service_discovery::{metrics::Metrics, poll_loop::make_poll_loop, IcServiceDiscoveryImpl}; -use slog::{info, o, Drain, Logger}; +use slog::{info, o, warn, Drain, Logger}; use url::Url; use crate::custom_filters::OldMachinesFilter; @@ -56,15 +56,23 @@ fn main() -> Result<()> { info!(log, "Starting prometheus-config-updater"); let mercury_target_dir = cli_args.targets_dir.join(cli_args.ic_name); - rt.block_on(sync_local_registry( - log.clone(), - mercury_target_dir, - nns_urls, - cli_args.skip_sync, - public_key, - )); + let (stop_signal_sender, stop_signal_rcv) = crossbeam::channel::bounded::<()>(0); + if rt + .block_on(sync_local_registry( + log.clone(), + mercury_target_dir, + nns_urls, + cli_args.skip_sync, + public_key, + &stop_signal_rcv, + )) + .is_err() + { + warn!(log, "Interrupted Prometheus config updater ..."); + return Ok(()); + }; - info!(log, "Starting IcServiceDiscovery ..."); + info!(log, "Starting Prometheus config updater ..."); let ic_discovery = Arc::new(IcServiceDiscoveryImpl::new( log.clone(), cli_args.targets_dir, @@ -80,7 +88,6 @@ fn main() -> Result<()> { let _metrics_endpoint = MetricsHttpEndpoint::new_insecure(rt.handle().clone(), exporter_config, metrics_registry, &log); - let (stop_signal_sender, stop_signal_rcv) = crossbeam::channel::bounded::<()>(0); let (update_signal_sender, update_signal_rcv) = crossbeam::channel::bounded::<()>(0); let loop_fn = make_poll_loop( log.clone(), diff --git a/rs/ic-observability/service-discovery/src/registry_sync.rs b/rs/ic-observability/service-discovery/src/registry_sync.rs index 5967eb809..5f3fca157 100644 --- a/rs/ic-observability/service-discovery/src/registry_sync.rs +++ b/rs/ic-observability/service-discovery/src/registry_sync.rs @@ -85,9 +85,9 @@ pub async fn sync_local_registry( }; loop { - match stop_signal.try_recv() { - Ok(_) => return Err(Interrupted {}), - Err(_) => {} + if stop_signal.try_recv().is_ok() { + // Interrupted early. Let's get out of here. + return Err(Interrupted {}); } if match registry_canister.get_latest_version().await { From 6e5a09c08403ccde22d4d11443addf1f34d8e583 Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Wed, 31 Jan 2024 13:09:56 +0100 Subject: [PATCH 7/8] Temporarily disable the test. --- rs/ic-observability/multiservice-discovery/tests/tests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rs/ic-observability/multiservice-discovery/tests/tests.rs b/rs/ic-observability/multiservice-discovery/tests/tests.rs index 432a7ba7c..dda46eee3 100644 --- a/rs/ic-observability/multiservice-discovery/tests/tests.rs +++ b/rs/ic-observability/multiservice-discovery/tests/tests.rs @@ -43,6 +43,7 @@ mod tests { } #[test] fn prom_targets_tests() { + return; // We will fix these tests soon. let rt = Runtime::new().unwrap(); let mut args = vec!["--nns-url", "http://donotupdate.app", "--targets-dir"]; let data_path = PathBuf::from_str(BAZEL_DATA_PATH).unwrap(); From 608de359ee1991727197c0f6d82d6210b4c1bedd Mon Sep 17 00:00:00 2001 From: Manuel Amador Date: Wed, 31 Jan 2024 14:54:18 +0100 Subject: [PATCH 8/8] Adjust visibility of some of the crates. --- Cargo.Bazel.lock | 6 +++++- rs/ic-observability/multiservice-discovery/BUILD.bazel | 1 + .../multiservice-discovery/src/definition.rs | 4 ++-- .../add_boundary_node_to_definition_handler.rs | 10 +++++----- .../src/server_handlers/add_definition_handler.rs | 6 +++--- .../src/server_handlers/delete_definition_handler.rs | 8 ++++---- .../export_prometheus_config_handler.rs | 8 +++----- .../src/server_handlers/export_targets_handler.rs | 8 +++----- .../src/server_handlers/get_definition_handler.rs | 2 +- .../multiservice-discovery/src/server_handlers/mod.rs | 2 -- .../src/server_handlers/replace_definitions_handler.rs | 2 +- .../multiservice-discovery/tests/tests.rs | 1 - 12 files changed, 28 insertions(+), 30 deletions(-) diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 9227bc2c2..e8497141e 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "fdd95fd9da734e10bce28317e6c1ca023d7a38e61e439eba2df424fc47224d40", + "checksum": "783d46492ed1e6d9dfe4f782c73bd2d5f96a7acc3187a9d22c32804f4b1973e2", "crates": { "actix-codec 0.5.1": { "name": "actix-codec", @@ -28458,6 +28458,10 @@ "id": "erased-serde 0.4.2", "target": "erased_serde" }, + { + "id": "futures 0.3.30", + "target": "futures" + }, { "id": "futures-util 0.3.30", "target": "futures_util" diff --git a/rs/ic-observability/multiservice-discovery/BUILD.bazel b/rs/ic-observability/multiservice-discovery/BUILD.bazel index 5ab5a4234..644575274 100644 --- a/rs/ic-observability/multiservice-discovery/BUILD.bazel +++ b/rs/ic-observability/multiservice-discovery/BUILD.bazel @@ -6,6 +6,7 @@ load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test") DEPS = [ "//rs/ic-observability/service-discovery", "//rs/ic-observability/multiservice-discovery-shared", + "//rs/ic-management-types", ] DEV_DEPENDENCIES = [ diff --git a/rs/ic-observability/multiservice-discovery/src/definition.rs b/rs/ic-observability/multiservice-discovery/src/definition.rs index c2c0cdd90..5cece558c 100644 --- a/rs/ic-observability/multiservice-discovery/src/definition.rs +++ b/rs/ic-observability/multiservice-discovery/src/definition.rs @@ -353,9 +353,9 @@ pub(crate) enum StartMode { } #[derive(Clone)] -pub(crate) struct DefinitionsSupervisor { +pub(super) struct DefinitionsSupervisor { rt: tokio::runtime::Handle, - pub(crate) definitions: Arc>>, + pub(super) definitions: Arc>>, allow_mercury_deletion: bool, } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs index 50bd3ce97..898d2e0ec 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs @@ -4,14 +4,14 @@ use std::fmt::{Display, Error as FmtError, Formatter}; use warp::Reply; -use super::{bad_request, not_found, ok, WebResult}; use crate::definition::DefinitionsSupervisor; use crate::server_handlers::dto::BoundaryNodeDto; +use crate::server_handlers::{bad_request, not_found, ok, WebResult}; #[derive(Clone)] -pub struct AddBoundaryNodeToDefinitionBinding { - pub supervisor: DefinitionsSupervisor, - pub log: Logger, +pub(super) struct AddBoundaryNodeToDefinitionBinding { + pub(crate) supervisor: DefinitionsSupervisor, + pub(crate) log: Logger, } #[derive(Debug)] @@ -28,7 +28,7 @@ impl Display for DefinitionNotFound { } } -pub(crate) async fn add_boundary_node( +pub(super) async fn add_boundary_node( boundary_node: BoundaryNodeDto, binding: AddBoundaryNodeToDefinitionBinding, ) -> WebResult { diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs index bd3510029..f54a9c28b 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs @@ -10,15 +10,15 @@ use crate::definition::{DefinitionsSupervisor, StartMode}; use crate::server_handlers::dto::DefinitionDto; #[derive(Clone)] -pub(crate) struct AddDefinitionBinding { - pub supervisor: DefinitionsSupervisor, +pub(super) struct AddDefinitionBinding { + pub(crate) supervisor: DefinitionsSupervisor, pub log: Logger, pub registry_path: PathBuf, pub poll_interval: Duration, pub registry_query_timeout: Duration, } -pub(crate) async fn add_definition(definition: DefinitionDto, binding: AddDefinitionBinding) -> WebResult { +pub(super) async fn add_definition(definition: DefinitionDto, binding: AddDefinitionBinding) -> WebResult { let log = binding.log.clone(); let dname = definition.name.clone(); let rej = format!("Definition {} could not be added", dname); diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs index de7d28185..89f142515 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs @@ -6,12 +6,12 @@ use warp::Reply; use super::{forbidden, not_found, ok, WebResult}; #[derive(Clone)] -pub(crate) struct DeleteDefinitionBinding { - pub supervisor: DefinitionsSupervisor, - pub log: Logger, +pub(super) struct DeleteDefinitionBinding { + pub(crate) supervisor: DefinitionsSupervisor, + pub(crate) log: Logger, } -pub async fn delete_definition(name: String, binding: DeleteDefinitionBinding) -> WebResult { +pub(super) async fn delete_definition(name: String, binding: DeleteDefinitionBinding) -> WebResult { let rej = format!("Definition {} could not be deleted", name); match binding.supervisor.stop(vec![name.clone()]).await { Ok(_) => ok(binding.log, format!("Deleted definition {}", name.clone())), diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs index e38e6421a..8d70c6ba5 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs @@ -3,17 +3,15 @@ use crate::definition::DefinitionsSupervisor; use multiservice_discovery_shared::builders::prometheus_config_structure::{map_target_group, PrometheusStaticConfig}; use multiservice_discovery_shared::contracts::target::{map_to_target_dto, TargetDto}; use service_discovery::job_types::{JobType, NodeOS}; -use slog::Logger; use std::collections::BTreeMap; use warp::reply::Reply; #[derive(Clone)] -pub struct ExportDefinitionConfigBinding { - pub supervisor: DefinitionsSupervisor, - pub log: Logger, +pub(super) struct ExportDefinitionConfigBinding { + pub(crate) supervisor: DefinitionsSupervisor, } -pub async fn export_prometheus_config(binding: ExportDefinitionConfigBinding) -> WebResult { +pub(super) async fn export_prometheus_config(binding: ExportDefinitionConfigBinding) -> WebResult { let definitions = binding.supervisor.definitions.lock().await; let mut ic_node_targets: Vec = vec![]; diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs index ba613e49b..6ef9272f9 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs @@ -3,17 +3,15 @@ use crate::definition::DefinitionsSupervisor; use ic_types::{NodeId, PrincipalId}; use multiservice_discovery_shared::contracts::target::{map_to_target_dto, TargetDto}; use service_discovery::job_types::{JobType, NodeOS}; -use slog::Logger; use std::collections::BTreeMap; use warp::reply::Reply; #[derive(Clone)] -pub struct ExportTargetsBinding { - pub supervisor: DefinitionsSupervisor, - pub log: Logger, +pub(super) struct ExportTargetsBinding { + pub(crate) supervisor: DefinitionsSupervisor, } -pub async fn export_targets(binding: ExportTargetsBinding) -> WebResult { +pub(super) async fn export_targets(binding: ExportTargetsBinding) -> WebResult { let definitions = binding.supervisor.definitions.lock().await; let mut ic_node_targets: Vec = vec![]; diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/get_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/get_definition_handler.rs index a616b14cd..2b4c51344 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/get_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/get_definition_handler.rs @@ -5,7 +5,7 @@ use crate::definition::DefinitionsSupervisor; use crate::server_handlers::dto::DefinitionDto; use crate::server_handlers::WebResult; -pub async fn get_definitions(supervisor: DefinitionsSupervisor) -> WebResult { +pub(super) async fn get_definitions(supervisor: DefinitionsSupervisor) -> WebResult { let definitions = supervisor.definitions.lock().await; let list = &definitions diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs index c9fd0a2ad..94b79a102 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs @@ -142,7 +142,6 @@ impl Server { let binding = ExportDefinitionConfigBinding { supervisor: self.supervisor.clone(), - log: self.log.clone(), }; let export_prometheus = warp::path!("prom" / "targets") .and(warp::get()) @@ -151,7 +150,6 @@ impl Server { let binding = ExportTargetsBinding { supervisor: self.supervisor.clone(), - log: self.log.clone(), }; let export_targets = warp::path!("targets") .and(warp::get()) diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs index 01094ea8d..9032266cf 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs @@ -7,7 +7,7 @@ use crate::definition::{Definition, StartMode}; use crate::server_handlers::dto::{BadDtoError, DefinitionDto}; use crate::server_handlers::AddDefinitionBinding as ReplaceDefinitionsBinding; -pub(crate) async fn replace_definitions( +pub(super) async fn replace_definitions( definitions: Vec, binding: ReplaceDefinitionsBinding, ) -> WebResult { diff --git a/rs/ic-observability/multiservice-discovery/tests/tests.rs b/rs/ic-observability/multiservice-discovery/tests/tests.rs index dda46eee3..432a7ba7c 100644 --- a/rs/ic-observability/multiservice-discovery/tests/tests.rs +++ b/rs/ic-observability/multiservice-discovery/tests/tests.rs @@ -43,7 +43,6 @@ mod tests { } #[test] fn prom_targets_tests() { - return; // We will fix these tests soon. let rt = Runtime::new().unwrap(); let mut args = vec!["--nns-url", "http://donotupdate.app", "--targets-dir"]; let data_path = PathBuf::from_str(BAZEL_DATA_PATH).unwrap();