Skip to content

Commit

Permalink
Event monitor: Bulk events from all transactions included in a block (i…
Browse files Browse the repository at this point in the history
…nformalsystems#958)

* event monitor: Bulk events from all transactions included in a block

* Update changelog

* Improve unit test to ensure items are not re-ordered

* Cleanup

* Ensure NewBlock event is always first in the event batch

* Re-enable events filtering in `listen` command

* Only print event batch header if there are matching events
  • Loading branch information
romac authored May 21, 2021
1 parent 2056c23 commit 4f2f2d1
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 54 deletions.
21 changes: 16 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,28 @@

## Unreleased

### FEATURES

- [release]
- Released official Hermes image on Docker Hub ([#894])

### IMPROVEMENTS

- [ibc-relayer]
- Bulk events from all transactions included in a block ([#957])

### BUG FIXES

- [ibc-relayer-cli]
- Prevent sending `ft-transfer` MsgTransfer on a non-Open channel. ([#960])

### FEATURES

- [release]
- Official hermes image on Docker Hub. ([#894])
### BREAKING CHANGES

> Nothing
[#960]: https://github.com/informalsystems/ibc-rs/issues/960
[#894]: https://github.com/informalsystems/ibc-rs/pull/894
[#957]: https://github.com/informalsystems/ibc-rs/issues/957
[#960]: https://github.com/informalsystems/ibc-rs/issues/960

## v0.3.1
*May 14h, 2021*
Expand Down Expand Up @@ -44,6 +53,8 @@ as well as support Protobuf-encoded keys.

### BREAKING CHANGES

> Nothing

[#875]: https://github.com/informalsystems/ibc-rs/issues/875
[#920]: https://github.com/informalsystems/ibc-rs/issues/920
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 74 additions & 13 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,59 @@
use std::{ops::Deref, sync::Arc, thread};
use std::{fmt, ops::Deref, str::FromStr, sync::Arc, thread};

use abscissa_core::{application::fatal_error, error::BoxError, Command, Options, Runnable};
use itertools::Itertools;
use tokio::runtime::Runtime as TokioRuntime;

use tendermint_rpc::query::{EventType, Query};
use ibc::{events::IbcEvent, ics24_host::identifier::ChainId};

use ibc::ics24_host::identifier::ChainId;
use ibc_relayer::{
config::ChainConfig,
event::monitor::{EventMonitor, EventReceiver},
};

use crate::prelude::*;

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum EventFilter {
NewBlock,
Tx,
}

impl EventFilter {
pub fn matches(&self, event: &IbcEvent) -> bool {
match self {
EventFilter::NewBlock => matches!(event, IbcEvent::NewBlock(_)),
EventFilter::Tx => {
!(matches!(
event,
IbcEvent::NewBlock(_) | IbcEvent::Empty(_) | IbcEvent::ChainError(_)
))
}
}
}
}

impl fmt::Display for EventFilter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NewBlock => write!(f, "NewBlock"),
Self::Tx => write!(f, "Tx"),
}
}
}

impl FromStr for EventFilter {
type Err = BoxError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"NewBlock" => Ok(Self::NewBlock),
"Tx" => Ok(Self::Tx),
invalid => Err(format!("unrecognized event type: {}", invalid).into()),
}
}
}

#[derive(Command, Debug, Options)]
pub struct ListenCmd {
/// Identifier of the chain to listen for events from
Expand All @@ -22,7 +62,7 @@ pub struct ListenCmd {

/// Add an event type to listen for, can be repeated. Listen for all events by default (available: Tx, NewBlock)
#[options(short = "e", long = "event", meta = "EVENT")]
events: Vec<EventType>,
events: Vec<EventFilter>,
}

impl ListenCmd {
Expand All @@ -34,7 +74,7 @@ impl ListenCmd {
.ok_or_else(|| format!("chain '{}' not found in configuration", self.chain_id))?;

let events = if self.events.is_empty() {
&[EventType::Tx, EventType::NewBlock]
&[EventFilter::Tx, EventFilter::NewBlock]
} else {
self.events.as_slice()
};
Expand All @@ -51,29 +91,52 @@ impl Runnable for ListenCmd {
}

/// Listen to events
pub fn listen(config: &ChainConfig, events: &[EventType]) -> Result<(), BoxError> {
pub fn listen(config: &ChainConfig, filters: &[EventFilter]) -> Result<(), BoxError> {
println!(
"[info] Listening for events `{}` on '{}'...",
events.iter().format(", "),
filters.iter().format(", "),
config.id
);

let rt = Arc::new(TokioRuntime::new()?);
let queries = events.iter().cloned().map(Query::from).collect();
let (event_monitor, rx) = subscribe(&config, queries, rt)?;
let (event_monitor, rx) = subscribe(&config, rt)?;

thread::spawn(|| event_monitor.run());

while let Ok(event_batch) = rx.recv() {
println!("{:#?}", event_batch);
match event_batch {
Ok(batch) => {
let matching_events = batch
.events
.into_iter()
.filter(|e| event_match(&e, filters))
.collect_vec();

if matching_events.is_empty() {
continue;
}

println!("- Event batch at height {}", batch.height);

for event in matching_events {
println!("+ {:#?}", event);
}

println!();
}
Err(e) => println!("- Error: {}", e),
}
}

Ok(())
}

fn event_match(event: &IbcEvent, filters: &[EventFilter]) -> bool {
filters.iter().any(|f| f.matches(event))
}

fn subscribe(
chain_config: &ChainConfig,
queries: Vec<Query>,
rt: Arc<TokioRuntime>,
) -> Result<(EventMonitor, EventReceiver), BoxError> {
let (mut event_monitor, rx) = EventMonitor::new(
Expand All @@ -83,8 +146,6 @@ fn subscribe(
)
.map_err(|e| format!("could not initialize event monitor: {}", e))?;

event_monitor.set_queries(queries);

event_monitor
.subscribe()
.map_err(|e| format!("could not initialize subscriptions: {}", e))?;
Expand Down
1 change: 1 addition & 0 deletions relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ tonic = "0.4"
dirs-next = "2.0.0"
dyn-clone = "1.0.3"
retry = { version = "1.2.1", default-features = false }
async-stream = "0.3.1"

[dependencies.tendermint]
version = "=0.19.0"
Expand Down
113 changes: 77 additions & 36 deletions relayer/src/event/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::sync::Arc;
use std::{cmp::Ordering, sync::Arc};

use crossbeam_channel as channel;
use futures::stream::StreamExt;
use futures::{stream::select_all, Stream};
use itertools::Itertools;
use futures::{
pin_mut,
stream::{self, select_all, StreamExt},
Stream,
};
use thiserror::Error;
use tokio::task::JoinHandle;
use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc};
use tracing::{error, info, trace};
use tracing::{debug, error, info, trace};

use tendermint_rpc::{
event::Event as RpcEvent,
Expand All @@ -18,7 +20,10 @@ use tendermint_rpc::{

use ibc::{events::IbcEvent, ics02_client::height::Height, ics24_host::identifier::ChainId};

use crate::util::retry::{retry_with_index, RetryResult};
use crate::util::{
retry::{retry_with_index, RetryResult},
stream::group_while,
};

mod retry_strategy {
use crate::util::retry::clamp_total;
Expand Down Expand Up @@ -276,24 +281,31 @@ impl EventMonitor {

/// Event monitor loop
pub fn run(mut self) {
info!(chain.id = %self.chain_id, "starting event monitor");
debug!(chain.id = %self.chain_id, "starting event monitor");

// Take ownership of the subscriptions
let subscriptions =
std::mem::replace(&mut self.subscriptions, Box::new(futures::stream::empty()));

// Convert the stream of RPC events into a stream of event batches.
let batches = stream_batches(subscriptions, self.chain_id.clone());

// Needed to be able to poll the stream
pin_mut!(batches);

// Work around double borrow
let rt = self.rt.clone();

loop {
let result = rt.block_on(async {
tokio::select! {
Some(event) = self.subscriptions.next() => {
event
.map_err(Error::NextEventBatchFailed)
.and_then(|e| self.collect_events(e))
},
Some(batch) = batches.next() => Ok(batch),
Some(e) = self.rx_err.recv() => Err(Error::WebSocketDriver(e)),
}
});

match result {
Ok(batches) => self.process_batches(batches).unwrap_or_else(|e| {
Ok(batch) => self.process_batch(batch).unwrap_or_else(|e| {
error!("failed to process event batch: {}", e);
}),
Err(e) => {
Expand All @@ -307,31 +319,60 @@ impl EventMonitor {
}

/// Collect the IBC events from the subscriptions
fn process_batches(&self, batches: Vec<EventBatch>) -> Result<()> {
for batch in batches {
self.tx_batch
.send(Ok(batch))
.map_err(|_| Error::ChannelSendFailed)?;
}
fn process_batch(&self, batch: EventBatch) -> Result<()> {
self.tx_batch
.send(Ok(batch))
.map_err(|_| Error::ChannelSendFailed)?;

Ok(())
}
}

/// Collect the IBC events from the subscriptions
fn collect_events(&mut self, event: RpcEvent) -> Result<Vec<EventBatch>> {
let ibc_events = crate::event::rpc::get_all_events(&self.chain_id, event)
.map_err(Error::CollectEventsFailed)?;

let events_by_height = ibc_events.into_iter().into_group_map();
let batches = events_by_height
.into_iter()
.map(|(height, events)| EventBatch {
chain_id: self.chain_id.clone(),
height,
events,
})
.collect();

Ok(batches)
}
/// Collect the IBC events from an RPC event
fn collect_events(chain_id: &ChainId, event: RpcEvent) -> impl Stream<Item = (Height, IbcEvent)> {
let events = crate::event::rpc::get_all_events(chain_id, event).unwrap_or_default();
stream::iter(events)
}

/// Convert a stream of RPC event into a stream of event batches
fn stream_batches(
subscriptions: Box<SubscriptionStream>,
chain_id: ChainId,
) -> impl Stream<Item = EventBatch> {
let id = chain_id.clone();

// Collect IBC events from each RPC event
let events = subscriptions
.filter_map(|rpc_event| async { rpc_event.ok() })
.flat_map(move |rpc_event| collect_events(&id, rpc_event));

// Group events by height
let grouped = group_while(events, |(h0, _), (h1, _)| h0 == h1);

// Convert each group to a batch
grouped.map(move |events| {
let height = events
.first()
.map(|(h, _)| h)
.copied()
.expect("internal error: found empty group"); // SAFETY: upheld by `group_while`

let mut events = events.into_iter().map(|(_, e)| e).collect();
sort_events(&mut events);

EventBatch {
height,
events,
chain_id: chain_id.clone(),
}
})
}

/// Sort the given events by putting the NewBlock event first,
/// and leaving the other events as is.
fn sort_events(events: &mut Vec<IbcEvent>) {
events.sort_by(|a, b| match (a, b) {
(IbcEvent::NewBlock(_), _) => Ordering::Less,
_ => Ordering::Equal,
})
}
1 change: 1 addition & 0 deletions relayer/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ pub use recv_multiple::{recv_multiple, try_recv_multiple};
pub mod iter;
pub mod retry;
pub mod sled;
pub mod stream;
Loading

0 comments on commit 4f2f2d1

Please sign in to comment.