Skip to content

Commit

Permalink
Merge pull request #91 from dfinity/metricsproxyguestscrape
Browse files Browse the repository at this point in the history
Scrape GuestOS metrics-proxy and clean up issues in multiservice-discovery.
  • Loading branch information
DFINITYManu committed Jan 22, 2024
2 parents 5842d80 + e649873 commit 2cf0927
Show file tree
Hide file tree
Showing 19 changed files with 404 additions and 663 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeSet;
use std::fmt::Debug;

use service_discovery::{jobs::Job, TargetGroup};
use service_discovery::{job_types::JobType, TargetGroup};

pub trait Config: erased_serde::Serialize + Debug {
fn updated(&self) -> bool;
Expand All @@ -10,5 +10,5 @@ pub trait Config: erased_serde::Serialize + Debug {
erased_serde::serialize_trait_object!(Config);

pub trait ConfigBuilder {
fn build(&mut self, target_groups: BTreeSet<TargetGroup>, job: Job) -> Box<dyn Config>;
fn build(&mut self, target_groups: BTreeSet<TargetGroup>, job_type: JobType) -> Box<dyn Config>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,28 @@ use std::{collections::BTreeSet, sync::Arc};
use crossbeam::select;
use crossbeam_channel::Receiver;
use service_discovery::metrics::Metrics;
use service_discovery::{jobs::Job, IcServiceDiscovery, TargetGroup};
use service_discovery::{job_types::JobType, IcServiceDiscovery, TargetGroup};
use slog::{info, warn};

use crate::{
config_builder::ConfigBuilder, config_updater::ConfigUpdater, filters::TargetGroupFilter,
};
use crate::{config_builder::ConfigBuilder, config_updater::ConfigUpdater, filters::TargetGroupFilter};

pub fn config_updater_loop(
log: slog::Logger,
discovery: Arc<dyn IcServiceDiscovery>,
filters: Arc<dyn TargetGroupFilter>,
shutdown_signal: Receiver<()>,
jobs: Vec<Job>,
jobs: Vec<JobType>,
update_signal_recv: Receiver<()>,
mut config_builder: impl ConfigBuilder,
config_updater: impl ConfigUpdater,
metrics: Metrics,
) -> impl FnMut() {
move || loop {
for job in &jobs {
let target_groups = match discovery.get_target_groups(job._type, log.clone()) {
let target_groups = match discovery.get_target_groups(*job, log.clone()) {
Ok(t) => t,
Err(e) => {
warn!(
log,
"Failed to retrieve targets for job {}: {:?}", job._type, e
);
warn!(log, "Failed to retrieve targets for job {}: {:?}", job, e);
continue;
}
};
Expand All @@ -41,10 +36,10 @@ pub fn config_updater_loop(

metrics
.total_targets
.with_label_values(&[job._type.to_string().as_str()])
.with_label_values(&[job.to_string().as_str()])
.set(target_groups.len().try_into().unwrap());

let config = config_builder.build(filtered_target_groups, job.clone());
let config = config_builder.build(filtered_target_groups, *job);
let config_binding = config.as_ref();
if let Err(e) = config_updater.update(config_binding) {
warn!(log, "Failed to write config {}: {:?}", &config.name(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use multiservice_discovery_shared::filters::node_regex_id_filter::NodeIDRegexFil
use multiservice_discovery_shared::filters::{TargetGroupFilter, TargetGroupFilterList};
use multiservice_discovery_shared::{
builders::{
log_vector_config_structure::VectorConfigBuilderImpl,
prometheus_config_structure::PrometheusConfigBuilder, ConfigBuilder,
log_vector_config_structure::VectorConfigBuilderImpl, prometheus_config_structure::PrometheusConfigBuilder,
ConfigBuilder,
},
contracts::target::TargetDto,
};
use service_discovery::job_types::{JobType, NodeOS};
use service_discovery::job_types::JobType;
use slog::{debug, info, warn, Logger};
use std::{
collections::hash_map::DefaultHasher,
Expand Down Expand Up @@ -49,10 +49,7 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece
},
recv(interval) -> msg => msg.expect("tick failed!")
};
info!(
logger,
"Downloading from {} @ interval {:?}", cli.sd_url, tick
);
info!(logger, "Downloading from {} @ interval {:?}", cli.sd_url, tick);

let response = match client.get(cli.sd_url.clone()).send().await {
Ok(res) => res,
Expand Down Expand Up @@ -91,10 +88,7 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece

let mut hasher = DefaultHasher::new();

let targets = targets
.into_iter()
.filter(|f| filters.filter(f))
.collect::<Vec<_>>();
let targets = targets.into_iter().filter(|f| filters.filter(f)).collect::<Vec<_>>();

for target in &targets {
target.hash(&mut hasher);
Expand All @@ -103,10 +97,7 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece
let hash = hasher.finish();

if current_hash != hash {
info!(
logger,
"Received new targets from {} @ interval {:?}", cli.sd_url, tick
);
info!(logger, "Received new targets from {} @ interval {:?}", cli.sd_url, tick);
current_hash = hash;

generate_config(&cli, targets, logger.clone());
Expand All @@ -116,17 +107,8 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece

fn generate_config(cli: &CliArgs, targets: Vec<TargetDto>, logger: Logger) {
let jobs = match cli.generator {
crate::Generator::Log(_) => vec![
JobType::NodeExporter(NodeOS::Guest),
JobType::NodeExporter(NodeOS::Host),
],
crate::Generator::Metric => vec![
JobType::NodeExporter(NodeOS::Guest),
JobType::NodeExporter(NodeOS::Host),
JobType::Orchestrator,
JobType::Replica,
JobType::MetricsProxy,
],
crate::Generator::Log(_) => JobType::all_for_logs(),
crate::Generator::Metric => JobType::all_for_ic_nodes(),
};

if std::fs::metadata(&cli.output_dir).is_err() {
Expand All @@ -148,8 +130,7 @@ fn generate_config(cli: &CliArgs, targets: Vec<TargetDto>, logger: Logger) {
let config = match &cli.generator {
crate::Generator::Log(subtype) => match &subtype.subcommands {
Subtype::SystemdJournalGatewayd { batch_size } => {
VectorConfigBuilderImpl::new(*batch_size, subtype.port, subtype.bn_port)
.build(targets_with_job)
VectorConfigBuilderImpl::new(*batch_size, subtype.port, subtype.bn_port).build(targets_with_job)
}
Subtype::ExecAndJournald {
script_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::Serialize;
use crate::contracts::target::TargetDto;

use super::{
log_vector_config_structure::{handle_ip, VectorRemapTransform},
log_vector_config_structure::VectorRemapTransform,
vector_config_enriched::{VectorConfigEnriched, VectorSource, VectorTransform},
ConfigBuilder,
};
Expand All @@ -22,10 +22,7 @@ pub struct ExecLogConfigBuilderImpl {
}

impl ConfigBuilder for ExecLogConfigBuilderImpl {
fn build(
&self,
target_groups: std::collections::BTreeSet<crate::contracts::target::TargetDto>,
) -> String {
fn build(&self, target_groups: std::collections::BTreeSet<crate::contracts::target::TargetDto>) -> String {
let mut config = VectorConfigEnriched::new();
let mut edited_records: Vec<TargetDto> = vec![];

Expand Down Expand Up @@ -60,7 +57,7 @@ impl ConfigBuilder for ExecLogConfigBuilderImpl {
"--url",
format!(
"http://[{}]:{}/entries",
handle_ip(record.clone(), job, is_bn),
job.ip(*record.targets.first().unwrap(), is_bn),
match is_bn {
true => self.bn_port,
false => self.port,
Expand All @@ -82,8 +79,7 @@ impl ConfigBuilder for ExecLogConfigBuilderImpl {
include_stderr: self.include_stderr,
};

let transform =
VectorRemapTransform::from(record.clone(), *job, key.clone(), is_bn);
let transform = VectorRemapTransform::from(record.clone(), *job, key.clone(), is_bn);

let mut source_map = HashMap::new();
source_map.insert(key.clone(), Box::new(source) as Box<dyn VectorSource>);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use std::collections::{BTreeSet, HashMap};
use ic_types::PrincipalId;
use serde::Serialize;

use service_discovery::guest_to_host_address;
use service_discovery::job_types::NodeOS;
use service_discovery::{job_types::JobType, TargetGroup};

use crate::builders::vector_config_enriched::VectorSource;
Expand Down Expand Up @@ -68,7 +66,7 @@ pub(crate) fn from_targets_into_vector_config(
let key = format!("{}-{}", key, job);
let source = VectorSystemdGatewayJournaldSource {
_type: "systemd_journal_gatewayd".into(),
endpoint: handle_ip(record.clone(), job, is_bn),
endpoint: job.ip(*record.targets.first().unwrap(), is_bn).to_string(),
data_dir: "logs".to_string(),
batch_size: builder.batch_size,
port: match is_bn {
Expand All @@ -77,8 +75,7 @@ pub(crate) fn from_targets_into_vector_config(
},
};
let source_key = format!("{}-source", key);
let transform =
VectorRemapTransform::from(record.clone(), *job, source_key.clone(), is_bn);
let transform = VectorRemapTransform::from(record.clone(), *job, source_key.clone(), is_bn);

let mut sources_map = HashMap::new();
sources_map.insert(source_key, Box::new(source) as Box<dyn VectorSource>);
Expand Down Expand Up @@ -134,28 +131,29 @@ const NODE_PROVIDER_ID: &str = "node_provider_id";
impl VectorRemapTransform {
pub fn from(target: TargetDto, job: JobType, input: String, is_bn: bool) -> Self {
let target_group = Into::<TargetGroup>::into(&target);
let mut labels: HashMap<String, String> = HashMap::new();

let anonymous = PrincipalId::new_anonymous().to_string();
let mut node_id = target_group.node_id.to_string();
if node_id == anonymous {
node_id = target.clone().name
}

let endpoint = handle_ip(target.clone(), &job, is_bn);

labels.insert(IC_NAME.into(), target_group.ic_name.to_string());
labels.insert(IC_NODE.into(), node_id.clone());
labels.insert(ADDRESS.into(), endpoint);
labels.insert(
NODE_PROVIDER_ID.into(),
target_group.node_provider_id.to_string(),
);
labels.insert(DC.into(), target_group.dc_id);
labels.extend(target.custom_labels);
if let Some(subnet_id) = target_group.subnet_id {
labels.insert(IC_SUBNET.into(), subnet_id.to_string());
}
let ip = job.ip(*target.targets.first().unwrap(), is_bn).to_string();
let labels = HashMap::from([
(IC_NAME.into(), target_group.ic_name.to_string()),
(IC_NODE.into(), node_id.clone()),
(ADDRESS.into(), ip),
(NODE_PROVIDER_ID.into(), target_group.node_provider_id.to_string()),
(DC.into(), target_group.dc_id),
])
.into_iter()
.chain(target.custom_labels.into_iter())
.chain(match target_group.subnet_id {
Some(subnet_id) => vec![(IC_SUBNET.into(), subnet_id.to_string())],
None => vec![],
})
.collect::<HashMap<_, _>>();

Self {
_type: "remap".into(),
inputs: vec![input],
Expand All @@ -170,32 +168,6 @@ impl VectorRemapTransform {
}
}

pub fn handle_ip(target_group: TargetDto, job_type: &JobType, is_bn: bool) -> String {
match job_type {
JobType::NodeExporter(NodeOS::Guest) => {
target_group.targets.first().unwrap().ip().to_string()
}
JobType::NodeExporter(NodeOS::Host) => match is_bn {
true => target_group.targets.first().unwrap().ip().to_string(),
false => guest_to_host_address(*target_group.targets.first().unwrap())
.unwrap()
.ip()
.to_string(),
},
JobType::MetricsProxy => match is_bn {
// It should not be possible for this to ever be true.
// There is a structural typing problem somewhere here.
true => target_group.targets.first().unwrap().ip().to_string(),
false => guest_to_host_address(*target_group.targets.first().unwrap())
.unwrap()
.ip()
.to_string(),
},
JobType::Replica => panic!("Unsupported job type for handle_ip"),
JobType::Orchestrator => panic!("Unsupported job type for handle_ip"),
}
}

#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
Expand All @@ -217,9 +189,7 @@ mod tests {
let mut parts = ipv6.split(':');

for item in &mut array {
*item = u16::from_str_radix(parts.next().unwrap(), 16)
.unwrap()
.to_be();
*item = u16::from_str_radix(parts.next().unwrap(), 16).unwrap().to_be();
}

array
Expand Down
Loading

0 comments on commit 2cf0927

Please sign in to comment.