Skip to content

Commit

Permalink
Refactored logic into server and service. Server working (#868)
Browse files Browse the repository at this point in the history
  • Loading branch information
andynog committed May 22, 2021
1 parent 6954493 commit 1781413
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 47 deletions.
4 changes: 3 additions & 1 deletion config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[global]
strategy = 'naive'
log_level = 'error'
log_level = 'debug'
telemetry_enabled = true
telemetry_port = 3002

[[chains]]
id = 'ibc-0'
Expand Down
2 changes: 2 additions & 0 deletions relayer-cli/src/commands/start_multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use ibc_relayer::supervisor::Supervisor;

use crate::conclude::Output;
use crate::prelude::*;
use ibc_relayer::telemetry::server::TelemetryServer;

#[derive(Clone, Command, Debug, Options)]
pub struct StartMultiCmd {}

impl Runnable for StartMultiCmd {
fn run(&self) {
let config = app_config();
let telemetry_server = TelemetryServer::spawn(config.global.telemetry_port);
let supervisor = Supervisor::spawn(config.clone()).expect("failed to spawn supervisor");
match supervisor.run() {
Ok(()) => Output::success_msg("done").exit(),
Expand Down
13 changes: 0 additions & 13 deletions relayer/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ use crate::{
},
object::{Client, Object, UnidirectionalChannelPath},
registry::Registry,
telemetry::service::TelemetryService,
util::try_recv_multiple,
worker::{WorkerMap, WorkerMsg},
};

mod error;
use crate::telemetry::state::TelemetryState;
pub use error::Error;

/// The supervisor listens for events on multiple pairs of chains,
Expand All @@ -42,28 +40,18 @@ pub struct Supervisor {
registry: Registry,
workers: WorkerMap,
worker_msg_rx: Receiver<WorkerMsg>,
telemetry_state: Arc<TelemetryState>,
}

impl Supervisor {
/// Spawns a [`Supervisor`] which will listen for events on all the chains in the [`Config`].
pub fn spawn(config: Config) -> Result<Self, BoxError> {
let registry = Registry::new(config.clone());
let (worker_msg_tx, worker_msg_rx) = crossbeam_channel::unbounded();

// Start the telemetry service
let telemetry_state = TelemetryState::init();
match config.global.telemetry_enabled {
true => TelemetryService::run(telemetry_state.clone(), config.global.telemetry_port),
false => println!("Telemetry not enabled"),
};

Ok(Self {
config,
registry,
workers: WorkerMap::new(worker_msg_tx),
worker_msg_rx,
telemetry_state,
})
}

Expand Down Expand Up @@ -109,7 +97,6 @@ impl Supervisor {
if let Ok(object) = Object::for_send_packet(packet, src_chain) {
collected.per_object.entry(object).or_default().push(event);
// Increase counter
self.telemetry_state.tx_counter.add(1);
}
}
IbcEvent::TimeoutPacket(ref packet) => {
Expand Down
3 changes: 2 additions & 1 deletion relayer/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod state;
pub mod service;
pub mod server;
pub mod state;
54 changes: 54 additions & 0 deletions relayer/src/telemetry/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use prometheus::{Encoder, TextEncoder};

use crate::telemetry::service::MetricUpdate;
use crossbeam_channel::Sender;
use tracing::info;
use crate::telemetry::state::TelemetryState;

pub struct TelemetryServer {
pub state: TelemetryState,
}

impl TelemetryServer {
fn new(state: TelemetryState) -> TelemetryServer {
TelemetryServer { state }
}
}

impl TelemetryServer {
fn run(listen_port: u16) -> () {
let telemetry_state = TelemetryState::new();
rouille::start_server(format!("localhost:{}", listen_port), move |request| {
router!(request,
// The prometheus endpoint
(GET) (/metrics) => {
telemetry_state.packets_relayed.add(1);
info!("metrics called on telemetry server");
let mut buffer = vec![];
let encoder = TextEncoder::new();
let metric_families = telemetry_state.exporter.registry().gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
dbg!(metric_families);
rouille::Response::from_data(encoder.format_type().to_string(), buffer)
},

// Any route other than /metrics
// return an empty response with a 404 status code.
_ => {
rouille::Response::empty_404()
}
)
});
}

pub fn spawn(port: u16) -> Sender<MetricUpdate> {

let (tx, _rx) = crossbeam_channel::unbounded();
//let (service, tx) = TelemetryService::new(app_state.clone());
//let server = TelemetryServer::new(app_state.clone());
std::thread::spawn(move || TelemetryServer::run( port));
//std::thread::spawn(|| service.run());

tx
}
}
44 changes: 19 additions & 25 deletions relayer/src/telemetry/service.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,27 @@
use prometheus::{Encoder, TextEncoder};

use crate::telemetry::state::TelemetryState;
use std::sync::Arc;
use crossbeam_channel::Receiver;
use crate::telemetry::state::TelemetryState;

pub struct TelemetryService {}

impl TelemetryService {
pub fn run(state: Arc<TelemetryState>, listen_port: u16) -> () {
rouille::start_server(format!("localhost:{}", listen_port), move |request| {
router!(request,
// The prometheus endpoint
(GET) (/metrics) => {

state.tx_counter.add(1);
pub enum MetricUpdate {
PacketsRelayed(u64)
}

let mut buffer = vec![];
let encoder = TextEncoder::new();
let metric_families = state.exporter.registry().gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
pub struct TelemetryService {
pub state: Arc<TelemetryState>,
pub rx: Receiver<MetricUpdate>
}

rouille::Response::from_data(encoder.format_type().to_string(), buffer)
},
impl TelemetryService {
fn run(self) {
while let Ok(update) = self.rx.recv() {
self.apply_update(update);
}
}

// Any route other than /metrics
// return an empty response with a 404 status code.
_ => {
rouille::Response::empty_404()
}
)
});
fn apply_update(&self, update: MetricUpdate) {
match update {
MetricUpdate::PacketsRelayed(n) => self.state.packets_relayed.add(n ),
}
}
}
14 changes: 7 additions & 7 deletions relayer/src/telemetry/state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use opentelemetry::metrics::BoundCounter;
use opentelemetry_prometheus::PrometheusExporter;
use std::sync::Arc;
use opentelemetry::global;
use opentelemetry::KeyValue;

Expand All @@ -12,20 +11,21 @@ pub struct TelemetryState {
pub exporter: PrometheusExporter,

// Count the number of trans
pub tx_counter: BoundCounter<'static, u64>,
pub packets_relayed: BoundCounter<'static, u64>,
}

impl TelemetryState {
pub fn init() -> Arc<TelemetryState> {
pub fn new() -> TelemetryState {
let exporter = opentelemetry_prometheus::exporter().init();
let meter = global::meter("hermes");
return Arc::new(TelemetryState {
let telemetry_state = TelemetryState {
exporter,
tx_counter: meter
packets_relayed: meter
.u64_counter("hermes.tx_count")
.with_description("Total number of transactions processed via the relayer.")
.init()
.bind(HANDLER_ALL.as_ref()),
});
};
telemetry_state
}
}
}

0 comments on commit 1781413

Please sign in to comment.