Skip to content

Commit

Permalink
Initial logic to include the telemetry in the Supervisor (#868)
Browse files Browse the repository at this point in the history
  • Loading branch information
andynog committed May 19, 2021
1 parent 415dead commit 6954493
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 20 deletions.
22 changes: 10 additions & 12 deletions relayer/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ use crate::{
},
object::{Client, Object, UnidirectionalChannelPath},
registry::Registry,
telemetry::service::TelemetryService,
util::try_recv_multiple,
worker::{WorkerMap, WorkerMsg},
};

mod error;
use crate::telemetry::service::TelemetryService;
use crate::telemetry::state::TelemetryState;
pub use error::Error;

/// The supervisor listens for events on multiple pairs of chains,
Expand All @@ -41,6 +42,7 @@ pub struct Supervisor {
registry: Registry,
workers: WorkerMap,
worker_msg_rx: Receiver<WorkerMsg>,
telemetry_state: Arc<TelemetryState>,
}

impl Supervisor {
Expand All @@ -50,24 +52,18 @@ impl Supervisor {
let (worker_msg_tx, worker_msg_rx) = crossbeam_channel::unbounded();

// Start the telemetry service
let telemetry = match config.global.telemetry_enabled {
true => {
println!(
"TELEMETRY ENABLED ON PORT: {:?}",
config.global.telemetry_port
);
Some(TelemetryService {
listen_port: config.global.telemetry_port,
})
}
false => None,
let telemetry_state = TelemetryState::init();
match config.global.telemetry_enabled {
true => TelemetryService::run(telemetry_state.clone(), config.global.telemetry_port),
false => println!("Telemetry not enabled"),
};

Ok(Self {
config,
registry,
workers: WorkerMap::new(worker_msg_tx),
worker_msg_rx,
telemetry_state,
})
}

Expand Down Expand Up @@ -112,6 +108,8 @@ impl Supervisor {
IbcEvent::SendPacket(ref packet) => {
if let Ok(object) = Object::for_send_packet(packet, src_chain) {
collected.per_object.entry(object).or_default().push(event);
// Increase counter
self.telemetry_state.tx_counter.add(1);
}
}
IbcEvent::TimeoutPacket(ref packet) => {
Expand Down
13 changes: 5 additions & 8 deletions relayer/src/telemetry/service.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use prometheus::{Encoder, TextEncoder};

use crate::telemetry::state::TelemetryState;
use std::sync::Arc;

pub struct TelemetryService {
pub(crate) listen_port: u16,
}
pub struct TelemetryService {}

impl TelemetryService {
pub async fn run(self) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let state = TelemetryState::init();

rouille::start_server(format!("localhost:{}", self.listen_port), move |request| {
pub fn run(state: Arc<TelemetryState>, listen_port: u16) -> () {
rouille::start_server(format!("localhost:{}", listen_port), move |request| {
router!(request,
// The prometheus endpoint
(GET) (/metrics) => {
Expand All @@ -31,6 +28,6 @@ impl TelemetryService {
rouille::Response::empty_404()
}
)
})
});
}
}

0 comments on commit 6954493

Please sign in to comment.