Skip to content

Commit

Permalink
Merge pull request #6 from sile/record-file
Browse files Browse the repository at this point in the history
Add `--record` and `--replay` options
  • Loading branch information
sile committed Oct 19, 2023
2 parents 79d4350 + 610d3a1 commit 0f09a72
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 31 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ erl_dist = "0.6"
erl_rpc = "0.2"
futures = "0.3"
log = "0.4"
serde = { version = "1.0.189", features = ["derive"] }
serde_json = "1.0.107"
simplelog = "0.12"
smol = "1"
tui = "0.19"
3 changes: 2 additions & 1 deletion src/erlang.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use erl_dist::node::NodeName;
use erl_dist::term::{Atom, List, Map, Term, Tuple};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemVersion(String);

impl SystemVersion {
Expand Down
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! A simple, terminal-based Erlang dashboard.
use std::path::PathBuf;
pub mod erlang;
pub mod metrics;
pub mod ui;
Expand All @@ -17,6 +18,14 @@ pub struct Options {
/// By default, the content of the `$HOME/.erlang.cookie` file is used.
#[clap(long, short = 'c')]
pub cookie: Option<String>,

/// If specified, the collected metrics will be recorded to the given file and can be replayed later.
#[clap(long, value_name = "FILE")]
pub record: Option<PathBuf>,

/// If specified, the recorded metrics will be replayed.
#[clap(long, requires = "record")]
pub replay: bool,
}

impl Options {
Expand Down
183 changes: 164 additions & 19 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
use crate::erlang::{MSAccThread, RpcClient, SystemVersion};
use crate::Options;
use anyhow::Context;
use serde::{Deserialize, Serialize};
use smol::fs::File;
use smol::io::AsyncWriteExt;
use std::collections::BTreeMap;
use std::io::BufRead;
use std::sync::mpsc;
use std::time::{Duration, Instant};

type MetricsReceiver = mpsc::Receiver<Metrics>;
type MetricsSender = mpsc::Sender<Metrics>;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Metrics {
pub timestamp: Instant,
pub timestamp: Duration,
pub items: BTreeMap<String, MetricValue>,
}

impl Metrics {
fn new() -> Self {
fn new(start: Instant) -> Self {
Self {
timestamp: Instant::now(),
timestamp: start.elapsed(),
items: BTreeMap::new(),
}
}
Expand Down Expand Up @@ -65,7 +70,7 @@ impl Metrics {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MetricValue {
Gauge {
value: u64,
Expand Down Expand Up @@ -230,20 +235,121 @@ pub fn format_u64(mut n: u64, suffix: &str) -> String {
}

#[derive(Debug)]
pub struct MetricsPoller {
pub rx: MetricsReceiver,
pub system_version: SystemVersion,
rpc_client: RpcClient,
old_microstate_accounting_flag: bool,
pub enum MetricsPoller {
Realtime(RealtimeMetricsPoller),
Replay(ReplayMetricsPoller),
}

impl MetricsPoller {
pub fn start_thread(options: Options) -> anyhow::Result<Self> {
if options.replay {
ReplayMetricsPoller::new(options).map(Self::Replay)
} else {
RealtimeMetricsPoller::start_thread(options).map(Self::Realtime)
}
}

pub fn is_replay(&self) -> bool {
matches!(self, Self::Replay(_))
}

pub fn system_version(&self) -> &SystemVersion {
match self {
Self::Realtime(poller) => &poller.system_version,
Self::Replay(poller) => &poller.system_version,
}
}

pub fn poll_metrics(&self, timeout: Duration) -> Result<Metrics, mpsc::RecvTimeoutError> {
match self {
Self::Realtime(poller) => poller.rx.recv_timeout(timeout),
Self::Replay(_) => {
unreachable!()
}
}
}

pub fn replay_last_time(&self) -> Duration {
match self {
Self::Realtime(_) => Duration::from_secs(0),
Self::Replay(poller) => poller
.metrics_log
.last()
.map(|m| m.timestamp)
.unwrap_or_default(),
}
}

pub fn get_metrics_range(
&self,
start_time: Duration,
end_time: Duration,
) -> anyhow::Result<impl '_ + Iterator<Item = &Metrics>> {
let Self::Replay(poller) = self else {
anyhow::bail!("`get_metrics_range()` is only available in replay mode");
};
Ok(poller.metrics_log.iter().filter(move |metrics| {
let time = metrics.timestamp;
start_time <= time && time <= end_time
}))
}
}

#[derive(Debug)]
pub struct ReplayMetricsPoller {
system_version: SystemVersion,
metrics_log: Vec<Metrics>,
}

impl ReplayMetricsPoller {
fn new(options: Options) -> anyhow::Result<Self> {
let Some(record_file_path) = options.record else {
anyhow::bail!("`--record` is required for replay mode");
};
let file = std::fs::File::open(&record_file_path).with_context(|| {
format!("failed to open record file: {}", record_file_path.display())
})?;
let reader = std::io::BufReader::new(file);

let mut system_version = None;
let mut metrics_log = Vec::new();
for (i, line) in reader.lines().enumerate() {
let line = line?;
if i == 0 {
system_version = Some(
serde_json::from_str(&line)
.with_context(|| format!("failed to parse record file: line={}", i + 1))?,
);
continue;
}
let metrics = serde_json::from_str(&line)
.with_context(|| format!("failed to parse record file: line={}", i + 1))?;
metrics_log.push(metrics);
}
let system_version =
system_version.ok_or_else(|| anyhow::anyhow!("record file is empty"))?;
Ok(Self {
system_version,
metrics_log,
})
}
}

#[derive(Debug)]
pub struct RealtimeMetricsPoller {
rx: MetricsReceiver,
system_version: SystemVersion,
rpc_client: RpcClient,
old_microstate_accounting_flag: bool,
}

impl RealtimeMetricsPoller {
fn start_thread(options: Options) -> anyhow::Result<Self> {
MetricsPollerThread::start_thread(options)
}
}

impl Drop for MetricsPoller {
impl Drop for RealtimeMetricsPoller {
fn drop(&mut self) {
if !self.old_microstate_accounting_flag {
if let Err(e) = smol::block_on(
Expand All @@ -264,10 +370,13 @@ struct MetricsPollerThread {
rpc_client: RpcClient,
tx: MetricsSender,
prev_metrics: Metrics,
start: Instant,
system_version: SystemVersion,
record_file: Option<File>,
}

impl MetricsPollerThread {
fn start_thread(options: Options) -> anyhow::Result<MetricsPoller> {
fn start_thread(options: Options) -> anyhow::Result<RealtimeMetricsPoller> {
let (tx, rx) = mpsc::channel();

let rpc_client: RpcClient = smol::block_on(async {
Expand All @@ -282,41 +391,77 @@ impl MetricsPollerThread {
"enabled microstate accounting (old flag state is {old_microstate_accounting_flag})"
);

let poller = MetricsPoller {
let poller = RealtimeMetricsPoller {
rx,
system_version,
system_version: system_version.clone(),
rpc_client: rpc_client.clone(),
old_microstate_accounting_flag,
};

let record_file = if let Some(path) = &options.record {
Some(File::from(std::fs::File::create(path).with_context(
|| format!("failed to record file {}", path.display()),
)?))
} else {
None
};

std::thread::spawn(|| {
let start = Instant::now();
Self {
options,
rpc_client,
tx,
prev_metrics: Metrics::new(),
prev_metrics: Metrics::new(start),
start,
system_version,
record_file,
}
.run()
});
Ok(poller)
}

async fn write_json_line(&mut self, value: &impl serde::Serialize) -> anyhow::Result<()> {
if let Some(file) = &mut self.record_file {
let mut bytes = serde_json::to_vec(value)?;
bytes.push(b'\n');
file.write_all(&bytes).await?;
file.flush().await?;
}
Ok(())
}

fn run(mut self) {
let interval = Duration::from_secs(self.options.polling_interval.get() as u64);
let mut next_time = Duration::from_secs(0);
smol::block_on(async {
if let Err(e) = self.write_json_line(&self.system_version.clone()).await {
log::error!("faild to write record file: {e}");
return;
}

loop {
match self.poll_once().await {
Err(e) => {
log::error!("faild to poll metrics: {e}");
break;
}
Ok(metrics) => {
let elapsed = metrics.timestamp.elapsed();
let elapsed = metrics.timestamp;

if let Err(e) = self.write_json_line(&metrics).await {
log::error!("faild to write record file: {e}");
break;
}

if self.tx.send(metrics).is_err() {
log::debug!("the main thread has terminated");
break;
}
if let Some(sleep_duration) = interval.checked_sub(elapsed) {

next_time += interval;
if let Some(sleep_duration) = next_time.checked_sub(elapsed) {
std::thread::sleep(sleep_duration);
}
}
Expand Down Expand Up @@ -379,7 +524,7 @@ impl MetricsPollerThread {
}

async fn poll_once(&mut self) -> anyhow::Result<Metrics> {
let mut metrics = Metrics::new();
let mut metrics = Metrics::new(self.start);

let msacc = self
.rpc_client
Expand Down Expand Up @@ -476,7 +621,7 @@ impl MetricsPollerThread {

log::debug!(
"MetricsPoller::poll_once(): elapsed={:?}",
metrics.timestamp.elapsed()
metrics.timestamp
);
metrics.calc_delta(&self.prev_metrics);

Expand Down
Loading

0 comments on commit 0f09a72

Please sign in to comment.