diff --git a/Cargo.lock b/Cargo.lock index f2a0f1adcc4..a7c33a672aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -651,6 +651,7 @@ dependencies = [ "hyper 0.14.7", "lighthouse_version", "logging", + "monitoring_api", "node_test_rig", "rand 0.7.3", "sensitive_url", @@ -1100,6 +1101,7 @@ dependencies = [ "http_metrics", "lazy_static", "lighthouse_metrics", + "monitoring_api", "network", "parking_lot", "prometheus", @@ -4036,6 +4038,27 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "monitoring_api" +version = "0.1.0" +dependencies = [ + "eth2", + "futures 0.3.14", + "lazy_static", + "lighthouse_metrics", + "lighthouse_version", + "regex", + "reqwest", + "sensitive_url", + "serde", + "serde_derive", + "serde_json", + "slog", + "store", + "task_executor", + "tokio 1.5.0", +] + [[package]] name = "multihash" version = "0.13.2" @@ -7135,6 +7158,7 @@ dependencies = [ "lighthouse_version", "lockfile", "logging", + "monitoring_api", "parking_lot", "rand 0.7.3", "rayon", diff --git a/Cargo.toml b/Cargo.toml index c1c4cfd497b..f36c7bac833 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ members = [ "common/validator_dir", "common/warp_utils", "common/fallback", + "common/monitoring_api", "consensus/cached_tree_hash", "consensus/int_to_bytes", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index f4a901dc269..4cfede51001 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -44,4 +44,5 @@ hyper = "0.14.4" lighthouse_version = { path = "../common/lighthouse_version" } hex = "0.4.2" slasher = { path = "../slasher" } +monitoring_api = { path = "../common/monitoring_api" } sensitive_url = { path = "../common/sensitive_url" } diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 28dea4055d6..3e67624461d 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -44,3 +44,4 @@ http_api = { path = "../http_api" } http_metrics = { path = "../http_metrics" } slasher = { path = "../../slasher" } slasher_service = { path = "../../slasher/service" } +monitoring_api = {path = "../../common/monitoring_api"} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 8ba7a0c3431..8b45ea61ef4 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -14,6 +14,7 @@ use environment::RuntimeContext; use eth1::{Config as Eth1Config, Service as Eth1Service}; use eth2_libp2p::NetworkGlobals; use genesis::{interop_genesis_state, Eth1GenesisService}; +use monitoring_api::{MonitoringHttpClient, ProcessType}; use network::{NetworkConfig, NetworkMessage, NetworkService}; use slasher::Slasher; use slasher_service::SlasherService; @@ -374,6 +375,22 @@ where SlasherService::new(beacon_chain, network_send).run(&context.executor) } + /// Start the explorer client which periodically sends beacon + /// and system metrics to the configured endpoint. + pub fn monitoring_client(self, config: &monitoring_api::Config) -> Result { + let context = self + .runtime_context + .as_ref() + .ok_or("monitoring_client requires a runtime_context")? + .service_context("monitoring_client".into()); + let monitoring_client = MonitoringHttpClient::new(config, context.log().clone())?; + monitoring_client.auto_update( + context.executor, + vec![ProcessType::BeaconNode, ProcessType::System], + ); + Ok(self) + } + /// Immediately starts the service that periodically logs information each slot. pub fn notifier(self) -> Result { let context = self diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index c2d78dd5398..043d7d6fae7 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -66,6 +66,7 @@ pub struct Config { pub eth1: eth1::Config, pub http_api: http_api::Config, pub http_metrics: http_metrics::Config, + pub monitoring_api: Option, pub slasher: Option, } @@ -87,6 +88,7 @@ impl Default for Config { graffiti: Graffiti::default(), http_api: <_>::default(), http_metrics: <_>::default(), + monitoring_api: None, slasher: None, validator_monitor_auto: false, validator_monitor_pubkeys: vec![], diff --git a/beacon_node/client/src/metrics.rs b/beacon_node/client/src/metrics.rs index 5598fde2208..f1027bb8215 100644 --- a/beacon_node/client/src/metrics.rs +++ b/beacon_node/client/src/metrics.rs @@ -6,4 +6,14 @@ lazy_static! { "sync_slots_per_second", "The number of blocks being imported per second" ); + + pub static ref IS_SYNCED: Result = try_create_int_gauge( + "sync_eth2_synced", + "Metric to check if the beacon chain is synced to head. 0 if not synced and non-zero if synced" + ); + + pub static ref NOTIFIER_HEAD_SLOT: Result = try_create_int_gauge( + "notifier_head_slot", + "The head slot sourced from the beacon chain notifier" + ); } diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index e38f5199fc9..aaa8e619ce7 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -77,6 +77,9 @@ pub fn spawn_notifier( }; let head_slot = head_info.slot; + + metrics::set_gauge(&metrics::NOTIFIER_HEAD_SLOT, head_slot.as_u64() as i64); + let current_slot = match beacon_chain.slot() { Ok(slot) => slot, Err(e) => { @@ -123,6 +126,7 @@ pub fn spawn_notifier( // Log if we are syncing if sync_state.is_syncing() { + metrics::set_gauge(&metrics::IS_SYNCED, 0); let distance = format!( "{} slots ({})", head_distance.as_u64(), @@ -151,6 +155,7 @@ pub fn spawn_notifier( ); } } else if sync_state.is_synced() { + metrics::set_gauge(&metrics::IS_SYNCED, 1); let block_info = if current_slot > head_slot { " … empty".to_string() } else { @@ -167,6 +172,7 @@ pub fn spawn_notifier( "slot" => current_slot, ); } else { + metrics::set_gauge(&metrics::IS_SYNCED, 0); info!( log, "Searching for peers"; diff --git a/beacon_node/eth1/src/metrics.rs b/beacon_node/eth1/src/metrics.rs index bbf2f6d83c5..f3d9483b2b2 100644 --- a/beacon_node/eth1/src/metrics.rs +++ b/beacon_node/eth1/src/metrics.rs @@ -26,4 +26,23 @@ lazy_static! { pub static ref ENDPOINT_REQUESTS: Result = try_create_int_counter_vec( "eth1_endpoint_requests", "The number of eth1 requests for each endpoint", &["endpoint"] ); + + /* + * Eth1 rpc connection + */ + + pub static ref ETH1_CONNECTED: Result = try_create_int_gauge( + "sync_eth1_connected", "Set to 1 if connected to an eth1 node, otherwise set to 0" + ); + + pub static ref ETH1_FALLBACK_CONFIGURED: Result = try_create_int_gauge( + "sync_eth1_fallback_configured", "Number of configured eth1 fallbacks" + ); + + // Note: This metric only checks if an eth1 fallback is configured, not if it is connected and synced. + // Checking for liveness of the fallback would require moving away from lazy checking of fallbacks. + pub static ref ETH1_FALLBACK_CONNECTED: Result = try_create_int_gauge( + "eth1_sync_fallback_connected", "Set to 1 if an eth1 fallback is connected, otherwise set to 0" + ); + } diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 0584a4b71be..8a28881b120 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -94,6 +94,9 @@ impl EndpointsCache { &crate::metrics::ENDPOINT_ERRORS, &[&endpoint.0.to_string()], ); + crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 0); + } else { + crate::metrics::set_gauge(&metrics::ETH1_CONNECTED, 1); } state } @@ -730,6 +733,7 @@ impl Service { let mut interval = interval_at(Instant::now(), update_interval); + let num_fallbacks = self.config().endpoints.len() - 1; let update_future = async move { loop { interval.tick().await; @@ -737,6 +741,15 @@ impl Service { } }; + // Set the number of configured eth1 servers + metrics::set_gauge(&metrics::ETH1_FALLBACK_CONFIGURED, num_fallbacks as i64); + // Since we lazily update eth1 fallbacks, it's not possible to know connection status of fallback. + // Hence, we set it to 1 if we have atleast one configured fallback. + if num_fallbacks > 0 { + metrics::set_gauge(&metrics::ETH1_FALLBACK_CONNECTED, 1); + } else { + metrics::set_gauge(&metrics::ETH1_FALLBACK_CONNECTED, 0); + } handle.spawn(update_future, "eth1"); } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index fb871c2e2cd..30d4245a4d6 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -232,6 +232,23 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true), ) + /* + * Monitoring metrics + */ + + .arg( + Arg::with_name("monitoring-endpoint") + .long("monitoring-endpoint") + .value_name("ADDRESS") + .help("Enables the monitoring service for sending system metrics to a remote endpoint. \ + This can be used to monitor your setup on certain services (e.g. beaconcha.in). \ + This flag sets the endpoint where the beacon node metrics will be sent. \ + Note: This will send information to a remote sever which may identify and associate your \ + validators, IP address and other personal information. Always use a HTTPS connection \ + and never provide an untrusted URL.") + .takes_value(true), + ) + /* * Standard staking flags */ diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 0550e8e0850..6e4c9aa7d07 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -136,6 +136,17 @@ pub fn get_config( client_config.http_metrics.allow_origin = Some(allow_origin.to_string()); } + /* + * Explorer metrics + */ + if let Some(monitoring_endpoint) = cli_args.value_of("monitoring-endpoint") { + client_config.monitoring_api = Some(monitoring_api::Config { + db_path: None, + freezer_db_path: None, + monitoring_endpoint: monitoring_endpoint.to_string(), + }); + } + // Log a warning indicating an open HTTP server if it wasn't specified explicitly // (e.g. using the --staking flag). if cli_args.is_present("staking") { diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 4c8610b6dad..ad0e6f6a8ce 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -63,14 +63,14 @@ impl ProductionBeaconNode { let log = context.log().clone(); let datadir = client_config.create_data_dir()?; let db_path = client_config.create_db_path()?; - let freezer_db_path_res = client_config.create_freezer_db_path(); + let freezer_db_path = client_config.create_freezer_db_path()?; let executor = context.executor.clone(); let builder = ClientBuilder::new(context.eth_spec_instance.clone()) .runtime_context(context) .chain_spec(spec) .http_api_config(client_config.http_api.clone()) - .disk_store(&datadir, &db_path, &freezer_db_path_res?, store_config)?; + .disk_store(&datadir, &db_path, &freezer_db_path, store_config)?; let builder = if let Some(slasher_config) = client_config.slasher.clone() { let slasher = Arc::new( @@ -82,6 +82,14 @@ impl ProductionBeaconNode { builder }; + let builder = if let Some(monitoring_config) = &mut client_config.monitoring_api { + monitoring_config.db_path = Some(db_path); + monitoring_config.freezer_db_path = Some(freezer_db_path); + builder.monitoring_client(monitoring_config)? + } else { + builder + }; + let builder = builder .beacon_chain_builder(client_genesis, client_config.clone()) .await?; diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index a5657ac05fc..a9e3fc69d59 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -21,7 +21,7 @@ mod impls; mod leveldb_store; mod memory_store; pub mod metadata; -mod metrics; +pub mod metrics; mod partial_beacon_state; pub mod iter; diff --git a/book/src/api-lighthouse.md b/book/src/api-lighthouse.md index e647c476ca8..dab55fb4701 100644 --- a/book/src/api-lighthouse.md +++ b/book/src/api-lighthouse.md @@ -27,20 +27,39 @@ curl -X GET "http://localhost:5052/lighthouse/health" -H "accept: application/j ```json { "data": { - "pid": 1728254, - "pid_num_threads": 47, - "pid_mem_resident_set_size": 510054400, - "pid_mem_virtual_memory_size": 3963158528, - "sys_virt_mem_total": 16715530240, - "sys_virt_mem_available": 4065374208, - "sys_virt_mem_used": 11383402496, - "sys_virt_mem_free": 1368662016, - "sys_virt_mem_percent": 75.67906, - "sys_loadavg_1": 4.92, - "sys_loadavg_5": 5.53, - "sys_loadavg_15": 5.58 + "sys_virt_mem_total": 16671133696, + "sys_virt_mem_available": 8273715200, + "sys_virt_mem_used": 7304818688, + "sys_virt_mem_free": 2998190080, + "sys_virt_mem_percent": 50.37101, + "sys_virt_mem_cached": 5013975040, + "sys_virt_mem_buffers": 1354149888, + "sys_loadavg_1": 2.29, + "sys_loadavg_5": 3.48, + "sys_loadavg_15": 3.72, + "cpu_cores": 4, + "cpu_threads": 8, + "system_seconds_total": 5728, + "user_seconds_total": 33680, + "iowait_seconds_total": 873, + "idle_seconds_total": 177530, + "cpu_time_total": 217447, + "disk_node_bytes_total": 358443397120, + "disk_node_bytes_free": 70025089024, + "disk_node_reads_total": 1141863, + "disk_node_writes_total": 1377993, + "network_node_bytes_total_received": 2405639308, + "network_node_bytes_total_transmit": 328304685, + "misc_node_boot_ts_seconds": 1620629638, + "misc_os": "linux", + "pid": 4698, + "pid_num_threads": 25, + "pid_mem_resident_set_size": 783757312, + "pid_mem_virtual_memory_size": 2564665344, + "pid_process_seconds_total": 22 } } + ``` ### `/lighthouse/syncing` diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 7ea051e2ec4..f81fc607fcb 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -76,38 +76,82 @@ pub struct ValidatorInclusionData { } #[cfg(target_os = "linux")] -use {procinfo::pid, psutil::process::Process}; +use { + procinfo::pid, psutil::cpu::os::linux::CpuTimesExt, + psutil::memory::os::linux::VirtualMemoryExt, psutil::process::Process, +}; /// Reports on the health of the Lighthouse instance. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Health { - /// The pid of this process. - pub pid: u32, - /// The number of threads used by this pid. - pub pid_num_threads: i32, - /// The total resident memory used by this pid. - pub pid_mem_resident_set_size: u64, - /// The total virtual memory used by this pid. - pub pid_mem_virtual_memory_size: u64, + #[serde(flatten)] + pub system: SystemHealth, + #[serde(flatten)] + pub process: ProcessHealth, +} + +/// System related health. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct SystemHealth { /// Total virtual memory on the system pub sys_virt_mem_total: u64, /// Total virtual memory available for new processes. pub sys_virt_mem_available: u64, - /// Total virtual memory used on the system + /// Total virtual memory used on the system. pub sys_virt_mem_used: u64, - /// Total virtual memory not used on the system + /// Total virtual memory not used on the system. pub sys_virt_mem_free: u64, - /// Percentage of virtual memory used on the system + /// Percentage of virtual memory used on the system. pub sys_virt_mem_percent: f32, + /// Total cached virtual memory on the system. + pub sys_virt_mem_cached: u64, + /// Total buffered virtual memory on the system. + pub sys_virt_mem_buffers: u64, + /// System load average over 1 minute. pub sys_loadavg_1: f64, /// System load average over 5 minutes. pub sys_loadavg_5: f64, /// System load average over 15 minutes. pub sys_loadavg_15: f64, + + /// Total cpu cores. + pub cpu_cores: u64, + /// Total cpu threads. + pub cpu_threads: u64, + + /// Total time spent in kernel mode. + pub system_seconds_total: u64, + /// Total time spent in user mode. + pub user_seconds_total: u64, + /// Total time spent in waiting for io. + pub iowait_seconds_total: u64, + /// Total idle cpu time. + pub idle_seconds_total: u64, + /// Total cpu time. + pub cpu_time_total: u64, + + /// Total capacity of disk. + pub disk_node_bytes_total: u64, + /// Free space in disk. + pub disk_node_bytes_free: u64, + /// Number of disk reads. + pub disk_node_reads_total: u64, + /// Number of disk writes. + pub disk_node_writes_total: u64, + + /// Total bytes received over all network interfaces. + pub network_node_bytes_total_received: u64, + /// Total bytes sent over all network interfaces. + pub network_node_bytes_total_transmit: u64, + + /// Boot time + pub misc_node_boot_ts_seconds: u64, + /// OS + pub misc_os: String, } -impl Health { +impl SystemHealth { #[cfg(not(target_os = "linux"))] pub fn observe() -> Result { Err("Health is only available on Linux".into()) @@ -115,33 +159,119 @@ impl Health { #[cfg(target_os = "linux")] pub fn observe() -> Result { - let process = - Process::current().map_err(|e| format!("Unable to get current process: {:?}", e))?; - - let process_mem = process - .memory_info() - .map_err(|e| format!("Unable to get process memory info: {:?}", e))?; - - let stat = pid::stat_self().map_err(|e| format!("Unable to get stat: {:?}", e))?; - let vm = psutil::memory::virtual_memory() .map_err(|e| format!("Unable to get virtual memory: {:?}", e))?; let loadavg = psutil::host::loadavg().map_err(|e| format!("Unable to get loadavg: {:?}", e))?; + let cpu = + psutil::cpu::cpu_times().map_err(|e| format!("Unable to get cpu times: {:?}", e))?; + + let disk_usage = psutil::disk::disk_usage("/") + .map_err(|e| format!("Unable to disk usage info: {:?}", e))?; + + let disk = psutil::disk::DiskIoCountersCollector::default() + .disk_io_counters() + .map_err(|e| format!("Unable to get disk counters: {:?}", e))?; + + let net = psutil::network::NetIoCountersCollector::default() + .net_io_counters() + .map_err(|e| format!("Unable to get network io counters: {:?}", e))?; + + let boot_time = psutil::host::boot_time() + .map_err(|e| format!("Unable to get system boot time: {:?}", e))? + .duration_since(std::time::UNIX_EPOCH) + .map_err(|e| format!("Boot time is lower than unix epoch: {}", e))? + .as_secs(); + Ok(Self { - pid: process.pid(), - pid_num_threads: stat.num_threads, - pid_mem_resident_set_size: process_mem.rss(), - pid_mem_virtual_memory_size: process_mem.vms(), sys_virt_mem_total: vm.total(), sys_virt_mem_available: vm.available(), sys_virt_mem_used: vm.used(), sys_virt_mem_free: vm.free(), + sys_virt_mem_cached: vm.cached(), + sys_virt_mem_buffers: vm.buffers(), sys_virt_mem_percent: vm.percent(), sys_loadavg_1: loadavg.one, sys_loadavg_5: loadavg.five, sys_loadavg_15: loadavg.fifteen, + cpu_cores: psutil::cpu::cpu_count_physical(), + cpu_threads: psutil::cpu::cpu_count(), + system_seconds_total: cpu.system().as_secs(), + cpu_time_total: cpu.total().as_secs(), + user_seconds_total: cpu.user().as_secs(), + iowait_seconds_total: cpu.iowait().as_secs(), + idle_seconds_total: cpu.idle().as_secs(), + disk_node_bytes_total: disk_usage.total(), + disk_node_bytes_free: disk_usage.free(), + disk_node_reads_total: disk.read_count(), + disk_node_writes_total: disk.write_count(), + network_node_bytes_total_received: net.bytes_recv(), + network_node_bytes_total_transmit: net.bytes_sent(), + misc_node_boot_ts_seconds: boot_time, + misc_os: std::env::consts::OS.to_string(), + }) + } +} + +/// Process specific health +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct ProcessHealth { + /// The pid of this process. + pub pid: u32, + /// The number of threads used by this pid. + pub pid_num_threads: i32, + /// The total resident memory used by this pid. + pub pid_mem_resident_set_size: u64, + /// The total virtual memory used by this pid. + pub pid_mem_virtual_memory_size: u64, + /// Number of cpu seconds consumed by this pid. + pub pid_process_seconds_total: u64, +} + +impl ProcessHealth { + #[cfg(not(target_os = "linux"))] + pub fn observe() -> Result { + Err("Health is only available on Linux".into()) + } + + #[cfg(target_os = "linux")] + pub fn observe() -> Result { + let process = + Process::current().map_err(|e| format!("Unable to get current process: {:?}", e))?; + + let process_mem = process + .memory_info() + .map_err(|e| format!("Unable to get process memory info: {:?}", e))?; + + let stat = pid::stat_self().map_err(|e| format!("Unable to get stat: {:?}", e))?; + let process_times = process + .cpu_times() + .map_err(|e| format!("Unable to get process cpu times : {:?}", e))?; + + Ok(Self { + pid: process.pid(), + pid_num_threads: stat.num_threads, + pid_mem_resident_set_size: process_mem.rss(), + pid_mem_virtual_memory_size: process_mem.vms(), + pid_process_seconds_total: process_times.busy().as_secs() + + process_times.children_system().as_secs() + + process_times.children_system().as_secs(), + }) + } +} + +impl Health { + #[cfg(not(target_os = "linux"))] + pub fn observe() -> Result { + Err("Health is only available on Linux".into()) + } + + #[cfg(target_os = "linux")] + pub fn observe() -> Result { + Ok(Self { + process: ProcessHealth::observe()?, + system: SystemHealth::observe()?, }) } } diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index b8f8601716e..0695cf07ac4 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -59,6 +59,7 @@ use std::time::Duration; use prometheus::core::{Atomic, GenericGauge, GenericGaugeVec}; pub use prometheus::{ + proto::{Metric, MetricFamily, MetricType}, Encoder, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Result, TextEncoder, }; diff --git a/common/monitoring_api/Cargo.toml b/common/monitoring_api/Cargo.toml new file mode 100644 index 00000000000..79284475bce --- /dev/null +++ b/common/monitoring_api/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "monitoring_api" +version = "0.1.0" +authors = ["pawan "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +reqwest = { version = "0.11.0", features = ["json","stream"] } +futures = "0.3.7" +task_executor = { path = "../task_executor" } +tokio = "1.1.0" +eth2 = {path = "../eth2"} +serde_json = "1.0.58" +serde = "1.0.116" +serde_derive = "1.0.116" +lighthouse_version = { path = "../lighthouse_version"} +lighthouse_metrics = { path = "../lighthouse_metrics" } +slog = "2.5.2" +store = { path = "../../beacon_node/store" } +lazy_static = "1.4.0" +regex = "1" +sensitive_url = { path = "../sensitive_url" } \ No newline at end of file diff --git a/common/monitoring_api/src/gather.rs b/common/monitoring_api/src/gather.rs new file mode 100644 index 00000000000..b761e5544b3 --- /dev/null +++ b/common/monitoring_api/src/gather.rs @@ -0,0 +1,193 @@ +use super::types::{BeaconProcessMetrics, ValidatorProcessMetrics}; +use lazy_static::lazy_static; +use lighthouse_metrics::{MetricFamily, MetricType}; +use serde_json::json; +use std::collections::HashMap; +use std::path::Path; + +/// Represents a metric that needs to be fetched from lighthouse metrics registry +/// and sent to the remote monitoring service. +#[derive(Debug, Clone)] +pub struct JsonMetric { + /// Name of the metric as used in Lighthouse metrics. + lighthouse_metric_name: &'static str, + /// Json key for the metric that we send to the remote monitoring endpoint. + json_output_key: &'static str, + /// Type of the json value to be sent to the remote monitoring endpoint + ty: JsonType, +} + +impl JsonMetric { + const fn new( + lighthouse_metric_name: &'static str, + json_output_key: &'static str, + ty: JsonType, + ) -> Self { + Self { + lighthouse_metric_name, + json_output_key, + ty, + } + } + + /// Return a json value given given the metric type. + fn get_typed_value(&self, value: i64) -> serde_json::Value { + match self.ty { + JsonType::Integer => json!(value), + JsonType::Boolean => { + if value > 0 { + json!(true) + } else { + json!(false) + } + } + } + } +} + +/// The required metrics for the beacon and validator processes. +const BEACON_PROCESS_METRICS: &[JsonMetric] = &[ + JsonMetric::new( + "sync_eth1_fallback_configured", + "sync_eth1_fallback_configured", + JsonType::Boolean, + ), + JsonMetric::new( + "sync_eth1_fallback_connected", + "sync_eth1_fallback_connected", + JsonType::Boolean, + ), + JsonMetric::new( + "sync_eth1_connected", + "sync_eth1_connected", + JsonType::Boolean, + ), + JsonMetric::new( + "store_disk_db_size", + "disk_beaconchain_bytes_total", + JsonType::Integer, + ), + JsonMetric::new( + "libp2p_peer_connected_peers_total", + "network_peers_connected", + JsonType::Integer, + ), + JsonMetric::new( + "libp2p_outbound_bytes", + "network_libp2p_bytes_total_transmit", + JsonType::Integer, + ), + JsonMetric::new( + "libp2p_inbound_bytes", + "network_libp2p_bytes_total_receive", + JsonType::Integer, + ), + JsonMetric::new( + "notifier_head_slot", + "sync_beacon_head_slot", + JsonType::Integer, + ), + JsonMetric::new("sync_eth2_synced", "sync_eth2_synced", JsonType::Boolean), +]; + +const VALIDATOR_PROCESS_METRICS: &[JsonMetric] = &[ + JsonMetric::new( + "vc_validators_enabled_count", + "validator_active", + JsonType::Integer, + ), + JsonMetric::new( + "vc_validators_total_count", + "validator_total", + JsonType::Integer, + ), + JsonMetric::new( + "sync_eth2_fallback_configured", + "sync_eth2_fallback_configured", + JsonType::Boolean, + ), + JsonMetric::new( + "sync_eth2_fallback_connected", + "sync_eth2_fallback_connected", + JsonType::Boolean, + ), +]; + +/// Represents the type for the JSON output. +#[derive(Debug, Clone)] +pub enum JsonType { + Integer, + Boolean, +} + +lazy_static! { + /// HashMap representing the `BEACON_PROCESS_METRICS`. + pub static ref BEACON_METRICS_MAP: HashMap = BEACON_PROCESS_METRICS + .iter() + .map(|metric| (metric.lighthouse_metric_name.to_string(), metric.clone())) + .collect(); + /// HashMap representing the `VALIDATOR_PROCESS_METRICS`. + pub static ref VALIDATOR_METRICS_MAP: HashMap = + VALIDATOR_PROCESS_METRICS + .iter() + .map(|metric| (metric.lighthouse_metric_name.to_string(), metric.clone())) + .collect(); +} + +/// Returns the value from a Counter/Gauge `MetricType` assuming that it has no associated labels +/// else it returns `None`. +fn get_value(mf: &MetricFamily) -> Option { + let metric = mf.get_metric().first()?; + match mf.get_field_type() { + MetricType::COUNTER => Some(metric.get_counter().get_value() as i64), + MetricType::GAUGE => Some(metric.get_gauge().get_value() as i64), + _ => None, + } +} + +/// Collects all metrics and returns a `serde_json::Value` object with the required metrics +/// from the metrics hashmap. +pub fn gather_metrics(metrics_map: &HashMap) -> Option { + let metric_families = lighthouse_metrics::gather(); + let mut res = serde_json::Map::with_capacity(metrics_map.len()); + for mf in metric_families.iter() { + let metric_name = mf.get_name(); + if metrics_map.contains_key(metric_name) { + let value = get_value(&mf).unwrap_or_default(); + let metric = metrics_map.get(metric_name)?; + let value = metric.get_typed_value(value); + let _ = res.insert(metric.json_output_key.to_string(), value); + }; + } + Some(serde_json::Value::Object(res)) +} + +/// Gathers and returns the lighthouse beacon metrics. +pub fn gather_beacon_metrics( + db_path: &Path, + freezer_db_path: &Path, +) -> Result { + // Update db size metrics + store::metrics::scrape_for_metrics(db_path, freezer_db_path); + + let beacon_metrics = gather_metrics(&BEACON_METRICS_MAP) + .ok_or_else(|| "Failed to gather beacon metrics".to_string())?; + let process = eth2::lighthouse::ProcessHealth::observe()?.into(); + + Ok(BeaconProcessMetrics { + beacon: beacon_metrics, + common: process, + }) +} + +/// Gathers and returns the lighthouse validator metrics. +pub fn gather_validator_metrics() -> Result { + let validator_metrics = gather_metrics(&VALIDATOR_METRICS_MAP) + .ok_or_else(|| "Failed to gather validator metrics".to_string())?; + + let process = eth2::lighthouse::ProcessHealth::observe()?.into(); + Ok(ValidatorProcessMetrics { + validator: validator_metrics, + common: process, + }) +} diff --git a/common/monitoring_api/src/lib.rs b/common/monitoring_api/src/lib.rs new file mode 100644 index 00000000000..3c28bf33301 --- /dev/null +++ b/common/monitoring_api/src/lib.rs @@ -0,0 +1,208 @@ +mod gather; +mod types; +use std::{path::PathBuf, time::Duration}; + +use eth2::lighthouse::SystemHealth; +use gather::{gather_beacon_metrics, gather_validator_metrics}; +use reqwest::{IntoUrl, Response}; +pub use reqwest::{StatusCode, Url}; +use sensitive_url::SensitiveUrl; +use serde::{Deserialize, Serialize}; +use slog::{debug, error, info}; +use task_executor::TaskExecutor; +use tokio::time::{interval_at, Instant}; +use types::*; + +pub use types::ProcessType; + +/// Duration after which we collect and send metrics to remote endpoint. +pub const UPDATE_DURATION: u64 = 60; +/// Timeout for HTTP requests. +pub const TIMEOUT_DURATION: u64 = 5; + +#[derive(Debug)] +pub enum Error { + /// The `reqwest` client raised an error. + Reqwest(reqwest::Error), + /// The supplied URL is badly formatted. It should look something like `http://127.0.0.1:5052`. + InvalidUrl(SensitiveUrl), + SystemMetricsFailed(String), + BeaconMetricsFailed(String), + ValidatorMetricsFailed(String), + /// The server returned an error message where the body was able to be parsed. + ServerMessage(ErrorMessage), + /// The server returned an error message where the body was unable to be parsed. + StatusCode(StatusCode), +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self { + Error::Reqwest(e) => write!(f, "Reqwest error: {}", e), + // Print the debug value + e => write!(f, "{:?}", e), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct Config { + /// Endpoint + pub monitoring_endpoint: String, + /// Path for the hot database required for fetching beacon db size metrics. + /// Note: not relevant for validator and system metrics. + pub db_path: Option, + /// Path for the cold database required for fetching beacon db size metrics. + /// Note: not relevant for validator and system metrics. + pub freezer_db_path: Option, +} + +#[derive(Clone)] +pub struct MonitoringHttpClient { + client: reqwest::Client, + /// Path to the hot database. Required for getting db size metrics + db_path: Option, + /// Path to the freezer database. + freezer_db_path: Option, + monitoring_endpoint: SensitiveUrl, + log: slog::Logger, +} + +impl MonitoringHttpClient { + pub fn new(config: &Config, log: slog::Logger) -> Result { + Ok(Self { + client: reqwest::Client::new(), + db_path: config.db_path.clone(), + freezer_db_path: config.freezer_db_path.clone(), + monitoring_endpoint: SensitiveUrl::parse(&config.monitoring_endpoint) + .map_err(|e| format!("Invalid monitoring endpoint: {:?}", e))?, + log, + }) + } + + /// Perform a HTTP POST request. + async fn post(&self, url: U, body: &T) -> Result<(), Error> { + let response = self + .client + .post(url) + .json(body) + .timeout(Duration::from_secs(TIMEOUT_DURATION)) + .send() + .await + .map_err(Error::Reqwest)?; + ok_or_error(response).await?; + Ok(()) + } + + /// Creates a task which periodically sends the provided process metrics + /// to the configured remote endpoint. + pub fn auto_update(self, executor: TaskExecutor, processes: Vec) { + let mut interval = interval_at( + // Have some initial delay for the metrics to get initialized + Instant::now() + Duration::from_secs(25), + Duration::from_secs(UPDATE_DURATION), + ); + + info!(self.log, "Starting monitoring api"; "endpoint" => %self.monitoring_endpoint); + + let update_future = async move { + loop { + interval.tick().await; + match self.send_metrics(&processes).await { + Ok(()) => { + debug!(self.log, "Metrics sent to remote server"; "endpoint" => %self.monitoring_endpoint); + } + Err(e) => { + error!(self.log, "Failed to send metrics to remote endpoint"; "error" => %e) + } + } + } + }; + + executor.spawn(update_future, "monitoring_api"); + } + + /// Gets beacon metrics and updates the metrics struct + pub fn get_beacon_metrics(&self) -> Result { + let db_path = self.db_path.as_ref().ok_or_else(|| { + Error::BeaconMetricsFailed("Beacon metrics require db path".to_string()) + })?; + + let freezer_db_path = self.db_path.as_ref().ok_or_else(|| { + Error::BeaconMetricsFailed("Beacon metrics require freezer db path".to_string()) + })?; + let metrics = gather_beacon_metrics(&db_path, &freezer_db_path) + .map_err(Error::BeaconMetricsFailed)?; + Ok(MonitoringMetrics { + metadata: Metadata::new(ProcessType::BeaconNode), + process_metrics: Process::Beacon(metrics), + }) + } + + /// Gets validator process metrics by querying the validator metrics endpoint + pub fn get_validator_metrics(&self) -> Result { + let metrics = gather_validator_metrics().map_err(Error::BeaconMetricsFailed)?; + Ok(MonitoringMetrics { + metadata: Metadata::new(ProcessType::Validator), + process_metrics: Process::Validator(metrics), + }) + } + + /// Gets system metrics by observing capturing the SystemHealth metrics. + pub fn get_system_metrics(&self) -> Result { + let system_health = SystemHealth::observe().map_err(Error::SystemMetricsFailed)?; + Ok(MonitoringMetrics { + metadata: Metadata::new(ProcessType::System), + process_metrics: Process::System(system_health.into()), + }) + } + + /// Return metric based on process type. + pub async fn get_metrics( + &self, + process_type: &ProcessType, + ) -> Result { + match process_type { + ProcessType::BeaconNode => self.get_beacon_metrics(), + ProcessType::System => self.get_system_metrics(), + ProcessType::Validator => self.get_validator_metrics(), + } + } + + /// Send metrics to the remote endpoint + pub async fn send_metrics(&self, processes: &[ProcessType]) -> Result<(), Error> { + let mut metrics = Vec::new(); + for process in processes { + match self.get_metrics(process).await { + Err(e) => error!( + self.log, + "Failed to get metrics"; + "process_type" => ?process, + "error" => %e + ), + Ok(metric) => metrics.push(metric), + } + } + info!( + self.log, + "Sending metrics to remote endpoint"; + "endpoint" => %self.monitoring_endpoint + ); + self.post(self.monitoring_endpoint.full.clone(), &metrics) + .await + } +} + +/// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an +/// appropriate error message. +async fn ok_or_error(response: Response) -> Result { + let status = response.status(); + + if status == StatusCode::OK { + Ok(response) + } else if let Ok(message) = response.json().await { + Err(Error::ServerMessage(message)) + } else { + Err(Error::StatusCode(status)) + } +} diff --git a/common/monitoring_api/src/types.rs b/common/monitoring_api/src/types.rs new file mode 100644 index 00000000000..9765e34613f --- /dev/null +++ b/common/monitoring_api/src/types.rs @@ -0,0 +1,177 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use eth2::lighthouse::{ProcessHealth, SystemHealth}; +use serde_derive::{Deserialize, Serialize}; + +pub const VERSION: u64 = 1; +pub const CLIENT_NAME: &str = "lighthouse"; + +/// An API error serializable to JSON. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ErrorMessage { + pub code: u16, + pub message: String, + #[serde(default)] + pub stacktraces: Vec, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct MonitoringMetrics { + #[serde(flatten)] + pub metadata: Metadata, + #[serde(flatten)] + pub process_metrics: Process, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum ProcessType { + BeaconNode, + Validator, + System, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct Metadata { + version: u64, + timestamp: u128, + process: ProcessType, +} + +impl Metadata { + pub fn new(process: ProcessType) -> Self { + Self { + version: VERSION, + timestamp: SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time should be greater than unix epoch") + .as_millis(), + process, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum Process { + Beacon(BeaconProcessMetrics), + System(SystemMetrics), + Validator(ValidatorProcessMetrics), +} + +/// Common metrics for all processes. +#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] +pub struct ProcessMetrics { + cpu_process_seconds_total: u64, + memory_process_bytes: u64, + + client_name: String, + client_version: String, + client_build: u64, +} + +impl From for ProcessMetrics { + fn from(health: ProcessHealth) -> Self { + Self { + cpu_process_seconds_total: health.pid_process_seconds_total, + memory_process_bytes: health.pid_mem_resident_set_size, + client_name: CLIENT_NAME.to_string(), + client_version: client_version().unwrap_or_default(), + client_build: client_build(), + } + } +} + +/// Metrics related to the system. +#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] +pub struct SystemMetrics { + cpu_cores: u64, + cpu_threads: u64, + cpu_node_system_seconds_total: u64, + cpu_node_user_seconds_total: u64, + cpu_node_iowait_seconds_total: u64, + cpu_node_idle_seconds_total: u64, + + memory_node_bytes_total: u64, + memory_node_bytes_free: u64, + memory_node_bytes_cached: u64, + memory_node_bytes_buffers: u64, + + disk_node_bytes_total: u64, + disk_node_bytes_free: u64, + + disk_node_io_seconds: u64, + disk_node_reads_total: u64, + disk_node_writes_total: u64, + + network_node_bytes_total_receive: u64, + network_node_bytes_total_transmit: u64, + + misc_node_boot_ts_seconds: u64, + misc_os: String, +} + +impl From for SystemMetrics { + fn from(health: SystemHealth) -> Self { + // Export format uses 3 letter os names + let misc_os = health.misc_os.get(0..3).unwrap_or("unk").to_string(); + Self { + cpu_cores: health.cpu_cores, + cpu_threads: health.cpu_threads, + cpu_node_system_seconds_total: health.cpu_time_total, + cpu_node_user_seconds_total: health.user_seconds_total, + cpu_node_iowait_seconds_total: health.iowait_seconds_total, + cpu_node_idle_seconds_total: health.idle_seconds_total, + + memory_node_bytes_total: health.sys_virt_mem_total, + memory_node_bytes_free: health.sys_virt_mem_free, + memory_node_bytes_cached: health.sys_virt_mem_cached, + memory_node_bytes_buffers: health.sys_virt_mem_buffers, + + disk_node_bytes_total: health.disk_node_bytes_total, + disk_node_bytes_free: health.disk_node_bytes_free, + + // Unavaliable for now + disk_node_io_seconds: 0, + disk_node_reads_total: health.disk_node_reads_total, + disk_node_writes_total: health.disk_node_writes_total, + + network_node_bytes_total_receive: health.network_node_bytes_total_received, + network_node_bytes_total_transmit: health.network_node_bytes_total_transmit, + + misc_node_boot_ts_seconds: health.misc_node_boot_ts_seconds, + misc_os, + } + } +} + +/// All beacon process metrics. +#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] +pub struct BeaconProcessMetrics { + #[serde(flatten)] + pub common: ProcessMetrics, + #[serde(flatten)] + pub beacon: serde_json::Value, +} + +/// All validator process metrics +#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] +pub struct ValidatorProcessMetrics { + #[serde(flatten)] + pub common: ProcessMetrics, + #[serde(flatten)] + pub validator: serde_json::Value, +} + +/// Returns the client version +fn client_version() -> Option { + let re = regex::Regex::new(r"\d+\.\d+\.\d+").expect("Regex is valid"); + re.find(lighthouse_version::VERSION) + .map(|m| m.as_str().to_string()) +} + +/// Returns the client build +/// Note: Lighthouse does not support build numbers, this is effectively a null-value. +fn client_build() -> u64 { + 0 +} diff --git a/common/warp_utils/src/metrics.rs b/common/warp_utils/src/metrics.rs index dc42aa6b357..1b9d89db91a 100644 --- a/common/warp_utils/src/metrics.rs +++ b/common/warp_utils/src/metrics.rs @@ -1,4 +1,4 @@ -use eth2::lighthouse::Health; +use eth2::lighthouse::{ProcessHealth, SystemHealth}; use lighthouse_metrics::*; lazy_static::lazy_static! { @@ -14,6 +14,10 @@ lazy_static::lazy_static! { "process_virtual_memory_bytes", "Virtual memory used by the current process" ); + pub static ref PROCESS_SECONDS: Result = try_create_int_gauge( + "process_cpu_seconds_total", + "Total cpu time taken by the current process" + ); pub static ref SYSTEM_VIRT_MEM_TOTAL: Result = try_create_int_gauge("system_virt_mem_total_bytes", "Total system virtual memory"); pub static ref SYSTEM_VIRT_MEM_AVAILABLE: Result = try_create_int_gauge( @@ -24,6 +28,10 @@ lazy_static::lazy_static! { try_create_int_gauge("system_virt_mem_used_bytes", "Used system virtual memory"); pub static ref SYSTEM_VIRT_MEM_FREE: Result = try_create_int_gauge("system_virt_mem_free_bytes", "Free system virtual memory"); + pub static ref SYSTEM_VIRT_MEM_CACHED: Result = + try_create_int_gauge("system_virt_mem_cached_bytes", "Used system virtual memory"); + pub static ref SYSTEM_VIRT_MEM_BUFFERS: Result = + try_create_int_gauge("system_virt_mem_buffer_bytes", "Free system virtual memory"); pub static ref SYSTEM_VIRT_MEM_PERCENTAGE: Result = try_create_float_gauge( "system_virt_mem_percentage", "Percentage of used virtual memory" @@ -34,15 +42,62 @@ lazy_static::lazy_static! { try_create_float_gauge("system_loadavg_5", "Loadavg over 5 minutes"); pub static ref SYSTEM_LOADAVG_15: Result = try_create_float_gauge("system_loadavg_15", "Loadavg over 15 minutes"); + + pub static ref CPU_CORES: Result = + try_create_int_gauge("cpu_cores", "Number of physical cpu cores"); + pub static ref CPU_THREADS: Result = + try_create_int_gauge("cpu_threads", "Number of logical cpu cores"); + + pub static ref CPU_SYSTEM_SECONDS_TOTAL: Result = + try_create_int_gauge("cpu_system_seconds_total", "Total time spent in kernel mode"); + pub static ref CPU_USER_SECONDS_TOTAL: Result = + try_create_int_gauge("cpu_user_seconds_total", "Total time spent in user mode"); + pub static ref CPU_IOWAIT_SECONDS_TOTAL: Result = + try_create_int_gauge("cpu_iowait_seconds_total", "Total time spent waiting for io"); + pub static ref CPU_IDLE_SECONDS_TOTAL: Result = + try_create_int_gauge("cpu_idle_seconds_total", "Total time spent idle"); + + pub static ref DISK_BYTES_TOTAL: Result = + try_create_int_gauge("disk_node_bytes_total", "Total capacity of disk"); + + pub static ref DISK_BYTES_FREE: Result = + try_create_int_gauge("disk_node_bytes_free", "Free space in disk"); + + pub static ref DISK_READS: Result = + try_create_int_gauge("disk_node_reads_total", "Number of disk reads"); + + pub static ref DISK_WRITES: Result = + try_create_int_gauge("disk_node_writes_total", "Number of disk writes"); + + pub static ref NETWORK_BYTES_RECEIVED: Result = + try_create_int_gauge("network_node_bytes_total_received", "Total bytes received over all network interfaces"); + pub static ref NETWORK_BYTES_SENT: Result = + try_create_int_gauge("network_node_bytes_total_transmit", "Total bytes sent over all network interfaces"); + + pub static ref BOOT_TIME: Result = + try_create_int_gauge("misc_node_boot_ts_seconds", "Boot time as unix epoch timestamp"); } pub fn scrape_health_metrics() { + scrape_process_health_metrics(); + scrape_system_health_metrics(); +} + +pub fn scrape_process_health_metrics() { // This will silently fail if we are unable to observe the health. This is desired behaviour // since we don't support `Health` for all platforms. - if let Ok(health) = Health::observe() { + if let Ok(health) = ProcessHealth::observe() { set_gauge(&PROCESS_NUM_THREADS, health.pid_num_threads as i64); set_gauge(&PROCESS_RES_MEM, health.pid_mem_resident_set_size as i64); set_gauge(&PROCESS_VIRT_MEM, health.pid_mem_virtual_memory_size as i64); + set_gauge(&PROCESS_SECONDS, health.pid_process_seconds_total as i64); + } +} + +pub fn scrape_system_health_metrics() { + // This will silently fail if we are unable to observe the health. This is desired behaviour + // since we don't support `Health` for all platforms. + if let Ok(health) = SystemHealth::observe() { set_gauge(&SYSTEM_VIRT_MEM_TOTAL, health.sys_virt_mem_total as i64); set_gauge( &SYSTEM_VIRT_MEM_AVAILABLE, @@ -57,5 +112,34 @@ pub fn scrape_health_metrics() { set_float_gauge(&SYSTEM_LOADAVG_1, health.sys_loadavg_1); set_float_gauge(&SYSTEM_LOADAVG_5, health.sys_loadavg_5); set_float_gauge(&SYSTEM_LOADAVG_15, health.sys_loadavg_15); + + set_gauge(&CPU_CORES, health.cpu_cores as i64); + set_gauge(&CPU_THREADS, health.cpu_threads as i64); + + set_gauge( + &CPU_SYSTEM_SECONDS_TOTAL, + health.system_seconds_total as i64, + ); + set_gauge(&CPU_USER_SECONDS_TOTAL, health.user_seconds_total as i64); + set_gauge( + &CPU_IOWAIT_SECONDS_TOTAL, + health.iowait_seconds_total as i64, + ); + set_gauge(&CPU_IDLE_SECONDS_TOTAL, health.idle_seconds_total as i64); + + set_gauge(&DISK_BYTES_TOTAL, health.disk_node_bytes_total as i64); + + set_gauge(&DISK_BYTES_FREE, health.disk_node_bytes_free as i64); + set_gauge(&DISK_READS, health.disk_node_reads_total as i64); + set_gauge(&DISK_WRITES, health.disk_node_writes_total as i64); + + set_gauge( + &NETWORK_BYTES_RECEIVED, + health.network_node_bytes_total_received as i64, + ); + set_gauge( + &NETWORK_BYTES_SENT, + health.network_node_bytes_total_transmit as i64, + ); } } diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 7fd3cb3b139..df74a203a45 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -64,4 +64,5 @@ scrypt = { version = "0.5.0", default-features = false } lighthouse_metrics = { path = "../common/lighthouse_metrics" } lazy_static = "1.4.0" fallback = { path = "../common/fallback" } +monitoring_api = { path = "../common/monitoring_api" } sensitive_url = { path = "../common/sensitive_url" } diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 78def569e8b..ead7adbc9bb 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -302,7 +302,7 @@ impl BeaconNodeFallback { } /// The count of candidates, regardless of their state. - pub async fn num_total(&self) -> usize { + pub fn num_total(&self) -> usize { self.candidates.len() } @@ -317,6 +317,17 @@ impl BeaconNodeFallback { n } + /// The count of synced and ready fallbacks excluding the primary beacon node candidate. + pub async fn num_synced_fallback(&self) -> usize { + let mut n = 0; + for candidate in self.candidates.iter().skip(1) { + if candidate.status(RequireSynced::Yes).await.is_ok() { + n += 1 + } + } + n + } + /// The count of candidates that are online and compatible, but not necessarily synced. pub async fn num_available(&self) -> usize { let mut n = 0; diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 75be32d0744..13e4f4e022b 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -181,4 +181,19 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { address of this server (e.g., http://localhost:5064).") .takes_value(true), ) + /* + * Explorer metrics + */ + .arg( + Arg::with_name("monitoring-endpoint") + .long("monitoring-endpoint") + .value_name("ADDRESS") + .help("Enables the monitoring service for sending system metrics to a remote endpoint. \ + This can be used to monitor your setup on certain services (e.g. beaconcha.in). \ + This flag sets the endpoint where the beacon node metrics will be sent. \ + Note: This will send information to a remote sever which may identify and associate your \ + validators, IP address and other personal information. Always use a HTTPS connection \ + and never provide an untrusted URL.") + .takes_value(true), + ) } diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 1c01cc98361..cb5c862dae5 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -43,6 +43,8 @@ pub struct Config { pub http_api: http_api::Config, /// Configuration for the HTTP REST API. pub http_metrics: http_metrics::Config, + /// Configuration for sending metrics to a remote explorer endpoint. + pub monitoring_api: Option, } impl Default for Config { @@ -70,6 +72,7 @@ impl Default for Config { graffiti_file: None, http_api: <_>::default(), http_metrics: <_>::default(), + monitoring_api: None, } } } @@ -233,6 +236,16 @@ impl Config { config.http_metrics.allow_origin = Some(allow_origin.to_string()); } + /* + * Explorer metrics + */ + if let Some(monitoring_endpoint) = cli_args.value_of("monitoring-endpoint") { + config.monitoring_api = Some(monitoring_api::Config { + db_path: None, + freezer_db_path: None, + monitoring_endpoint: monitoring_endpoint.to_string(), + }); + } Ok(config) } diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index 31a5efd3cb8..d5e5bf4814a 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -108,6 +108,16 @@ lazy_static::lazy_static! { "The number of beacon node requests for each endpoint", &["endpoint"] ); + + pub static ref ETH2_FALLBACK_CONFIGURED: Result = try_create_int_gauge( + "sync_eth2_fallback_configured", + "The number of configured eth2 fallbacks", + ); + + pub static ref ETH2_FALLBACK_CONNECTED: Result = try_create_int_gauge( + "sync_eth2_fallback_connected", + "Set to 1 if connected to atleast one synced eth2 fallback node, otherwise set to 0", + ); } pub fn gather_prometheus_metrics( @@ -126,20 +136,6 @@ pub fn gather_prometheus_metrics( } } - if let Some(validator_store) = &shared.validator_store { - let initialized_validators_lock = validator_store.initialized_validators(); - let initialized_validators = initialized_validators_lock.read(); - - set_gauge( - &ENABLED_VALIDATORS_COUNT, - initialized_validators.num_enabled() as i64, - ); - set_gauge( - &TOTAL_VALIDATORS_COUNT, - initialized_validators.num_total() as i64, - ); - } - if let Some(duties_service) = &shared.duties_service { if let Some(slot) = duties_service.slot_clock.now() { let current_epoch = slot.epoch(T::slots_per_epoch()); diff --git a/validator_client/src/initialized_validators.rs b/validator_client/src/initialized_validators.rs index f89a1096ecc..c471adcc8d8 100644 --- a/validator_client/src/initialized_validators.rs +++ b/validator_client/src/initialized_validators.rs @@ -14,6 +14,7 @@ use account_utils::{ ZeroizeString, }; use eth2_keystore::Keystore; +use lighthouse_metrics::set_gauge; use lockfile::{Lockfile, LockfileError}; use slog::{debug, error, info, warn, Logger}; use std::collections::{HashMap, HashSet}; @@ -609,6 +610,16 @@ impl InitializedValidators { } else { debug!(log, "Key cache not modified"); } + + // Update the enabled and total validator counts + set_gauge( + &crate::http_metrics::metrics::ENABLED_VALIDATORS_COUNT, + self.num_enabled() as i64, + ); + set_gauge( + &crate::http_metrics::metrics::TOTAL_VALIDATORS_COUNT, + self.num_total() as i64, + ); Ok(()) } } diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index a1673146e79..0f462aca7e2 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -17,6 +17,8 @@ pub mod http_api; pub use cli::cli_app; pub use config::Config; +use lighthouse_metrics::set_gauge; +use monitoring_api::{MonitoringHttpClient, ProcessType}; use crate::beacon_node_fallback::{ start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, RequireSynced, @@ -125,6 +127,17 @@ impl ProductionValidatorClient { None }; + // Start the explorer client which periodically sends validator process + // and system metrics to the configured endpoint. + if let Some(monitoring_config) = &config.monitoring_api { + let monitoring_client = + MonitoringHttpClient::new(monitoring_config, context.log().clone())?; + monitoring_client.auto_update( + context.executor.clone(), + vec![ProcessType::Validator, ProcessType::System], + ); + }; + let mut validator_defs = ValidatorDefinitions::open_or_create(&config.validator_dir) .map_err(|e| format!("Unable to open or create validator definitions: {:?}", e))?; @@ -225,10 +238,19 @@ impl ProductionValidatorClient { }) .collect::, String>>()?; + let num_nodes = beacon_nodes.len(); let candidates = beacon_nodes .into_iter() .map(CandidateBeaconNode::new) .collect(); + + // Set the count for beacon node fallbacks excluding the primary beacon node + set_gauge( + &http_metrics::metrics::ETH2_FALLBACK_CONFIGURED, + num_nodes.saturating_sub(1) as i64, + ); + // Initialize the number of connected, synced fallbacks to 0. + set_gauge(&http_metrics::metrics::ETH2_FALLBACK_CONNECTED, 0); let mut beacon_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new(candidates, context.eth2_config.spec.clone(), log.clone()); @@ -409,7 +431,7 @@ async fn init_from_beacon_node( loop { beacon_nodes.update_unready_candidates().await; let num_available = beacon_nodes.num_available().await; - let num_total = beacon_nodes.num_total().await; + let num_total = beacon_nodes.num_total(); if num_available > 0 { info!( context.log(), diff --git a/validator_client/src/notifier.rs b/validator_client/src/notifier.rs index 8b6d523cccb..9b99c1a7e40 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/src/notifier.rs @@ -1,4 +1,6 @@ +use crate::http_metrics; use crate::{DutiesService, ProductionValidatorClient}; +use lighthouse_metrics::set_gauge; use slog::{error, info, Logger}; use slot_clock::SlotClock; use tokio::time::{sleep, Duration}; @@ -39,7 +41,7 @@ async fn notify( ) { let num_available = duties_service.beacon_nodes.num_available().await; let num_synced = duties_service.beacon_nodes.num_synced().await; - let num_total = duties_service.beacon_nodes.num_total().await; + let num_total = duties_service.beacon_nodes.num_total(); if num_synced > 0 { info!( log, @@ -57,6 +59,12 @@ async fn notify( "synced" => num_synced, ) } + let num_synced_fallback = duties_service.beacon_nodes.num_synced_fallback().await; + if num_synced_fallback > 0 { + set_gauge(&http_metrics::metrics::ETH2_FALLBACK_CONNECTED, 1); + } else { + set_gauge(&http_metrics::metrics::ETH2_FALLBACK_CONNECTED, 0); + } if let Some(slot) = duties_service.slot_clock.now() { let epoch = slot.epoch(E::slots_per_epoch());