Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get metrics #3

Merged
merged 2 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 10 additions & 12 deletions kaspad/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,17 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
let consensus_manager = Arc::new(ConsensusManager::new(consensus_factory));
let consensus_monitor = Arc::new(ConsensusMonitor::new(processing_counters.clone(), tick_service.clone()));

let perf_monitor = args.perf_metrics.then(|| {
let perf_monitor_builder = PerfMonitorBuilder::new()
.with_fetch_interval(Duration::from_secs(args.perf_metrics_interval_sec))
.with_tick_service(tick_service.clone());
let perf_monitor = if args.perf_metrics {
let cb = move |counters| {
trace!("[{}] metrics: {:?}", kaspa_perf_monitor::SERVICE_NAME, counters);
};
Arc::new(
PerfMonitorBuilder::new()
.with_fetch_interval(Duration::from_secs(args.perf_metrics_interval_sec))
.with_fetch_cb(cb)
.with_tick_service(tick_service.clone())
.build(),
)
});
Arc::new(perf_monitor_builder.with_fetch_cb(cb).build())
} else {
Arc::new(perf_monitor_builder.build())
};

let notify_service = Arc::new(NotifyService::new(notification_root.clone(), notification_recv));
let index_service: Option<Arc<IndexService>> = if args.utxoindex {
Expand Down Expand Up @@ -291,6 +290,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
processing_counters,
wrpc_borsh_counters.clone(),
wrpc_json_counters.clone(),
perf_monitor.clone(),
));
let grpc_service = Arc::new(GrpcService::new(grpc_server_addr, rpc_core_service.clone(), args.rpc_max_clients));

Expand All @@ -305,9 +305,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
async_runtime.register(grpc_service);
async_runtime.register(p2p_service);
async_runtime.register(consensus_monitor);
if let Some(perf_monitor) = perf_monitor {
async_runtime.register(perf_monitor);
}
async_runtime.register(perf_monitor);
let wrpc_service_tasks: usize = 2; // num_cpus::get() / 2;
// Register wRPC servers based on command line arguments
[
Expand Down
2 changes: 1 addition & 1 deletion metrics/perf_monitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ log.workspace = true
thiserror.workspace = true

perf_monitor = {version = "0.2.0", git = "https://github.com/larksuite/perf-monitor-rs", rev = "3b2d52768f229c481186b991ae56559ab4cd087a"}

tokio = { workspace = true, features = ["rt", "macros", "time"] }
portable-atomic = {version = "1.4.1", features = ["float"]}
page_size = "0.5.0"
23 changes: 19 additions & 4 deletions metrics/perf_monitor/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub struct Builder<TS, D, CB> {
tick_service: TS,
fetch_interval: D,
fetch_callback: CB,
page_size: usize,
}

impl Builder<Unspecified, Unspecified, Unspecified> {
Expand All @@ -18,19 +19,24 @@ impl Builder<Unspecified, Unspecified, Unspecified> {

impl Default for Builder<Unspecified, Unspecified, Unspecified> {
fn default() -> Self {
Self { tick_service: Unspecified {}, fetch_interval: Unspecified {}, fetch_callback: Unspecified {} }
Self {
tick_service: Unspecified {},
fetch_interval: Unspecified {},
fetch_callback: Unspecified {},
page_size: page_size::get(),
}
}
}

impl<D, CB> Builder<Unspecified, D, CB> {
pub fn with_tick_service<TS: AsRef<TickService>>(self, tick_service: TS) -> Builder<TS, D, CB> {
Builder { tick_service, fetch_interval: self.fetch_interval, fetch_callback: self.fetch_callback }
Builder { tick_service, fetch_interval: self.fetch_interval, fetch_callback: self.fetch_callback, page_size: self.page_size }
}
}

impl<TS, CB> Builder<TS, Unspecified, CB> {
pub fn with_fetch_interval(self, fetch_interval: Duration) -> Builder<TS, Duration, CB> {
Builder { tick_service: self.tick_service, fetch_interval, fetch_callback: self.fetch_callback }
Builder { tick_service: self.tick_service, fetch_interval, fetch_callback: self.fetch_callback, page_size: self.page_size }
}
}

Expand All @@ -39,7 +45,12 @@ impl<TS, D> Builder<TS, D, Unspecified> {
self,
fetch_callback: CB,
) -> Builder<TS, D, Box<dyn Fn(CountersSnapshot) + Sync + Send>> {
Builder { tick_service: self.tick_service, fetch_interval: self.fetch_interval, fetch_callback: Box::new(fetch_callback) as _ }
Builder {
tick_service: self.tick_service,
fetch_interval: self.fetch_interval,
fetch_callback: Box::new(fetch_callback) as _,
page_size: self.page_size,
}
}
}

Expand All @@ -50,6 +61,7 @@ impl<TS: AsRef<TickService>> Builder<TS, Unspecified, Unspecified> {
fetch_interval: Duration::from_secs(1),
counters: Default::default(),
fetch_callback: None,
page_size: self.page_size as u64,
}
}
}
Expand All @@ -61,6 +73,7 @@ impl<TS: AsRef<TickService>> Builder<TS, Duration, Unspecified> {
fetch_interval: self.fetch_interval,
counters: Default::default(),
fetch_callback: None,
page_size: self.page_size as u64,
}
}
}
Expand All @@ -72,6 +85,7 @@ impl<TS: AsRef<TickService>> Builder<TS, Unspecified, Box<dyn Fn(CountersSnapsho
fetch_interval: Duration::from_secs(1),
counters: Default::default(),
fetch_callback: Some(self.fetch_callback),
page_size: self.page_size as u64,
}
}
}
Expand All @@ -83,6 +97,7 @@ impl<TS: AsRef<TickService>> Builder<TS, Duration, Box<dyn Fn(CountersSnapshot)
fetch_interval: self.fetch_interval,
counters: Default::default(),
fetch_callback: Some(self.fetch_callback),
page_size: self.page_size as u64,
}
}
}
10 changes: 10 additions & 0 deletions metrics/perf_monitor/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub(crate) struct Counters {
/// Usage" "VM Size" column of taskmgr.exe.
pub virtual_memory_size: AtomicU64,

pub resident_set_size_bytes: AtomicU64,
pub virtual_memory_size_bytes: AtomicU64,

pub core_num: AtomicUsize,
pub cpu_usage: AtomicF64,

Expand All @@ -31,6 +34,8 @@ impl Counters {
pub(crate) fn update(&self, snapshot: CountersSnapshot) {
self.resident_set_size.store(snapshot.resident_set_size, Ordering::Release);
self.virtual_memory_size.store(snapshot.resident_set_size, Ordering::Release);
self.resident_set_size_bytes.store(snapshot.resident_set_size_bytes, Ordering::Release);
self.virtual_memory_size_bytes.store(snapshot.virtual_memory_size_bytes, Ordering::Release);
self.core_num.store(snapshot.core_num, Ordering::Release);
self.cpu_usage.store(snapshot.cpu_usage, Ordering::Release);
self.fd_num.store(snapshot.fd_num, Ordering::Release);
Expand All @@ -43,6 +48,8 @@ impl Counters {
CountersSnapshot {
resident_set_size: self.resident_set_size.load(Ordering::Acquire),
virtual_memory_size: self.virtual_memory_size.load(Ordering::Acquire),
resident_set_size_bytes: self.resident_set_size_bytes.load(Ordering::Acquire),
virtual_memory_size_bytes: self.virtual_memory_size_bytes.load(Ordering::Acquire),
core_num: self.core_num.load(Ordering::Acquire),
cpu_usage: self.cpu_usage.load(Ordering::Acquire),
fd_num: self.fd_num.load(Ordering::Acquire),
Expand All @@ -69,6 +76,9 @@ pub struct CountersSnapshot {
/// Usage" "VM Size" column of taskmgr.exe.
pub virtual_memory_size: u64,

pub resident_set_size_bytes: u64,
pub virtual_memory_size_bytes: u64,

pub core_num: usize,
pub cpu_usage: f64,

Expand Down
3 changes: 3 additions & 0 deletions metrics/perf_monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct Monitor<TS: AsRef<TickService>> {
fetch_interval: Duration,
counters: Counters,
fetch_callback: Option<Box<dyn Fn(CountersSnapshot) + Sync + Send>>,
page_size: u64,
}

impl<TS: AsRef<TickService>> Monitor<TS> {
Expand Down Expand Up @@ -59,6 +60,8 @@ impl<TS: AsRef<TickService>> Monitor<TS> {
let counters_snapshot = CountersSnapshot {
resident_set_size,
virtual_memory_size,
resident_set_size_bytes: resident_set_size * self.page_size,
virtual_memory_size_bytes: virtual_memory_size * self.page_size,
core_num,
cpu_usage,
fd_num,
Expand Down
1 change: 1 addition & 0 deletions rpc/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ kaspa-index-core.workspace = true
kaspa-txscript.workspace = true
kaspa-math.workspace = true
kaspa-mining-errors.workspace = true
kaspa-perf-monitor.workspace = true

faster-hex.workspace = true
serde.workspace = true
Expand Down
12 changes: 10 additions & 2 deletions rpc/core/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,10 +689,18 @@ pub struct GetMetricsRequest {
#[serde(rename_all = "camelCase")]
pub struct ProcessMetrics {
// pub uptime: u64,
// pub memory_used: u64,
// pub storage_used: u64,
// pub grpc_connections: u32,
// pub wrpc_connections: u32,
pub resident_set_size_bytes: u64,
pub virtual_memory_size_bytes: u64,
pub core_num: u64,
pub cpu_usage: f64,
pub fd_num: u64,
pub disk_io_read_bytes: u64,
pub disk_io_write_bytes: u64,
pub disk_io_read_per_sec: f64,
pub disk_io_write_per_sec: f64,

pub borsh_live_connections: u64,
pub borsh_connection_attempts: u64,
pub borsh_handshake_failures: u64,
Expand Down
1 change: 1 addition & 0 deletions rpc/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ kaspa-p2p-flows.workspace = true
kaspa-math.workspace = true
kaspa-utxoindex.workspace = true
kaspa-wrpc-core.workspace = true
kaspa-perf-monitor.workspace = true

log.workspace = true
async-trait.workspace = true
Expand Down
33 changes: 32 additions & 1 deletion rpc/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use kaspa_core::{
kaspad_env::version,
signals::Shutdown,
task::service::{AsyncService, AsyncServiceError, AsyncServiceFuture},
task::tick::TickService,
trace, warn,
};
use kaspa_index_core::{
Expand All @@ -40,6 +41,7 @@ use kaspa_notify::{
subscriber::{Subscriber, SubscriptionManager},
};
use kaspa_p2p_flows::flow_context::FlowContext;
use kaspa_perf_monitor::{counters::CountersSnapshot, Monitor as PerfMonitor};
use kaspa_rpc_core::{
api::rpc::{RpcApi, MAX_SAFE_WINDOW_SIZE},
model::*,
Expand Down Expand Up @@ -89,6 +91,7 @@ pub struct RpcCoreService {
wrpc_borsh_counters: Arc<WrpcServerCounters>,
wrpc_json_counters: Arc<WrpcServerCounters>,
shutdown: SingleTrigger,
perf_monitor: Arc<PerfMonitor<Arc<TickService>>>,
}

const RPC_CORE: &str = "rpc-core";
Expand All @@ -107,6 +110,7 @@ impl RpcCoreService {
processing_counters: Arc<ProcessingCounters>,
wrpc_borsh_counters: Arc<WrpcServerCounters>,
wrpc_json_counters: Arc<WrpcServerCounters>,
perf_monitor: Arc<PerfMonitor<Arc<TickService>>>,
) -> Self {
// Prepare consensus-notify objects
let consensus_notify_channel = Channel::<ConsensusNotification>::default();
Expand Down Expand Up @@ -168,6 +172,7 @@ impl RpcCoreService {
wrpc_borsh_counters,
wrpc_json_counters,
shutdown: SingleTrigger::default(),
perf_monitor,
}
}

Expand Down Expand Up @@ -352,7 +357,12 @@ impl RpcApi for RpcCoreService {
}

async fn get_mempool_entry_call(&self, request: GetMempoolEntryRequest) -> RpcResult<GetMempoolEntryResponse> {
let Some(transaction) = self.mining_manager.clone().get_transaction(request.transaction_id, !request.filter_transaction_pool, request.include_orphan_pool).await else {
let Some(transaction) = self
.mining_manager
.clone()
.get_transaction(request.transaction_id, !request.filter_transaction_pool, request.include_orphan_pool)
.await
else {
return Err(RpcError::TransactionNotFound(request.transaction_id));
};
let session = self.consensus_manager.consensus().session().await;
Expand Down Expand Up @@ -630,7 +640,28 @@ impl RpcApi for RpcCoreService {
// UNIMPLEMENTED METHODS

async fn get_metrics_call(&self, req: GetMetricsRequest) -> RpcResult<GetMetricsResponse> {
let CountersSnapshot {
resident_set_size_bytes,
virtual_memory_size_bytes,
core_num,
cpu_usage,
fd_num,
disk_io_read_bytes,
disk_io_write_bytes,
disk_io_read_per_sec,
disk_io_write_per_sec,
..
} = self.perf_monitor.snapshot();
let process_metrics = req.process_metrics.then_some(ProcessMetrics {
resident_set_size_bytes,
virtual_memory_size_bytes,
core_num: core_num as u64,
cpu_usage,
fd_num: fd_num as u64,
disk_io_read_bytes,
disk_io_write_bytes,
disk_io_read_per_sec,
disk_io_write_per_sec,
borsh_live_connections: self.wrpc_borsh_counters.live_connections.load(Ordering::Relaxed),
borsh_connection_attempts: self.wrpc_borsh_counters.connection_attempts.load(Ordering::Relaxed),
borsh_handshake_failures: self.wrpc_borsh_counters.handshake_failures.load(Ordering::Relaxed),
Expand Down
Loading