From 0227e768420ade80cc24d1edc3149c2d66099573 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 19 Feb 2021 18:03:59 +0100 Subject: [PATCH 01/14] Spike for parallel relaying between multiple channels --- modules/src/events.rs | 2 +- modules/src/ics02_client/events.rs | 16 +- modules/src/ics03_connection/events.rs | 16 +- modules/src/ics04_channel/events.rs | 48 ++-- relayer-cli/src/commands.rs | 9 +- relayer-cli/src/commands/cli_utils.rs | 14 +- relayer-cli/src/commands/start_multi.rs | 335 ++++++++++++++++++++++++ relayer/src/link.rs | 10 +- 8 files changed, 395 insertions(+), 55 deletions(-) create mode 100644 relayer-cli/src/commands/start_multi.rs diff --git a/modules/src/events.rs b/modules/src/events.rs index 56163e76c5..0d66b8ee5a 100644 --- a/modules/src/events.rs +++ b/modules/src/events.rs @@ -81,7 +81,7 @@ impl IbcEvent { serde_json::to_string(self).unwrap() } - pub fn height(&self) -> &Height { + pub fn height(&self) -> Height { match self { IbcEvent::NewBlock(bl) => bl.height(), IbcEvent::CreateClient(ev) => ev.height(), diff --git a/modules/src/ics02_client/events.rs b/modules/src/ics02_client/events.rs index 9d97d07b60..4ff14174fb 100644 --- a/modules/src/ics02_client/events.rs +++ b/modules/src/ics02_client/events.rs @@ -66,8 +66,8 @@ impl NewBlock { pub fn set_height(&mut self, height: Height) { self.height = height; } - pub fn height(&self) -> &Height { - &self.height + pub fn height(&self) -> Height { + self.height } } @@ -104,8 +104,8 @@ impl CreateClient { pub fn client_id(&self) -> &ClientId { &self.0.client_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; @@ -146,8 +146,8 @@ impl UpdateClient { &self.0.client_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; @@ -188,8 +188,8 @@ impl ClientMisbehavior { pub fn client_id(&self) -> &ClientId { &self.0.client_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; diff --git a/modules/src/ics03_connection/events.rs b/modules/src/ics03_connection/events.rs index 83b0cbc756..a4e9cb360e 100644 --- a/modules/src/ics03_connection/events.rs +++ b/modules/src/ics03_connection/events.rs @@ -88,8 +88,8 @@ impl OpenInit { pub fn connection_id(&self) -> &Option { &self.0.connection_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; @@ -131,8 +131,8 @@ impl OpenTry { pub fn connection_id(&self) -> &Option { &self.0.connection_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; @@ -174,8 +174,8 @@ impl OpenAck { pub fn connection_id(&self) -> &Option { &self.0.connection_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; @@ -217,8 +217,8 @@ impl OpenConfirm { pub fn connection_id(&self) -> &Option { &self.0.connection_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; diff --git a/modules/src/ics04_channel/events.rs b/modules/src/ics04_channel/events.rs index 6de776c30b..a646ed535c 100644 --- a/modules/src/ics04_channel/events.rs +++ b/modules/src/ics04_channel/events.rs @@ -181,8 +181,8 @@ impl OpenInit { pub fn channel_id(&self) -> &Option { &self.0.channel_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; @@ -225,8 +225,8 @@ impl OpenTry { pub fn channel_id(&self) -> &Option { &self.0.channel_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; @@ -269,8 +269,8 @@ impl OpenAck { pub fn channel_id(&self) -> &Option { &self.0.channel_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; @@ -313,8 +313,8 @@ impl OpenConfirm { pub fn channel_id(&self) -> &Option { &self.0.channel_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; @@ -360,8 +360,8 @@ impl CloseInit { pub fn channel_id(&self) -> &Option { &self.0.channel_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; @@ -416,8 +416,8 @@ impl CloseConfirm { pub fn channel_id(&self) -> &Option { &self.0.channel_id } - pub fn height(&self) -> &Height { - &self.0.height + pub fn height(&self) -> Height { + self.0.height } pub fn set_height(&mut self, height: Height) { self.0.height = height; @@ -486,8 +486,8 @@ pub struct SendPacket { } impl SendPacket { - pub fn height(&self) -> &Height { - &self.height + pub fn height(&self) -> Height { + self.height } pub fn set_height(&mut self, height: Height) { self.height = height; @@ -524,8 +524,8 @@ pub struct ReceivePacket { } impl ReceivePacket { - pub fn height(&self) -> &Height { - &self.height + pub fn height(&self) -> Height { + self.height } pub fn set_height(&mut self, height: Height) { self.height = height; @@ -564,8 +564,8 @@ pub struct WriteAcknowledgement { } impl WriteAcknowledgement { - pub fn height(&self) -> &Height { - &self.height + pub fn height(&self) -> Height { + self.height } pub fn set_height(&mut self, height: Height) { self.height = height; @@ -607,8 +607,8 @@ pub struct AcknowledgePacket { } impl AcknowledgePacket { - pub fn height(&self) -> &Height { - &self.height + pub fn height(&self) -> Height { + self.height } pub fn set_height(&mut self, height: Height) { self.height = height; @@ -643,8 +643,8 @@ pub struct TimeoutPacket { } impl TimeoutPacket { - pub fn height(&self) -> &Height { - &self.height + pub fn height(&self) -> Height { + self.height } pub fn set_height(&mut self, height: Height) { self.height = height @@ -686,8 +686,8 @@ pub struct TimeoutOnClosePacket { } impl TimeoutOnClosePacket { - pub fn height(&self) -> &Height { - &self.height + pub fn height(&self) -> Height { + self.height } pub fn set_height(&mut self, height: Height) { self.height = height; diff --git a/relayer-cli/src/commands.rs b/relayer-cli/src/commands.rs index 12b835b56e..0f7e2146fe 100644 --- a/relayer-cli/src/commands.rs +++ b/relayer-cli/src/commands.rs @@ -13,8 +13,8 @@ use crate::commands::channel::ChannelCmds; use crate::config::Config; use self::{ - keys::KeysCmd, light::LightCmd, listen::ListenCmd, query::QueryCmd, start::StartCmd, tx::TxCmd, - version::VersionCmd, + keys::KeysCmd, light::LightCmd, listen::ListenCmd, query::QueryCmd, start::StartCmd, + start_multi::StartMultiCmd, tx::TxCmd, version::VersionCmd, }; mod channel; @@ -25,6 +25,7 @@ mod light; mod listen; mod query; mod start; +mod start_multi; mod tx; mod version; @@ -57,6 +58,10 @@ pub enum CliCmd { #[options(help = "Start the relayer")] Start(StartCmd), + /// The `start-multi` subcommand + #[options(help = "Start the relayer in multi-channel mode")] + StartMulti(StartMultiCmd), + /// The `channel` subcommand #[options(help = "Channel functionality for managing channels")] Channel(ChannelCmds), diff --git a/relayer-cli/src/commands/cli_utils.rs b/relayer-cli/src/commands/cli_utils.rs index b0b664548c..222524ede3 100644 --- a/relayer-cli/src/commands/cli_utils.rs +++ b/relayer-cli/src/commands/cli_utils.rs @@ -1,14 +1,12 @@ -use abscissa_core::config; - use ibc::ics24_host::identifier::ChainId; -use ibc_relayer::chain::CosmosSdkChain; use ibc_relayer::{chain::handle::ChainHandle, config::StoreConfig}; use ibc_relayer::{chain::runtime::ChainRuntime, config::ChainConfig}; +use ibc_relayer::{chain::CosmosSdkChain, config::Config}; -use crate::application::CliApp; use crate::error::{Error, Kind}; /// Pair of chain handlers that are used by most CLIs. +#[derive(Clone, Debug)] pub struct ChainHandlePair { /// Source chain handle pub src: Box, @@ -20,7 +18,7 @@ impl ChainHandlePair { /// Spawn the source and destination chain runtime from the configuration and chain identifiers, /// and return the pair of associated handles. pub fn spawn( - config: &config::Reader, + config: &Config, src_chain_id: &ChainId, dst_chain_id: &ChainId, ) -> Result { @@ -32,7 +30,7 @@ impl ChainHandlePair { /// is used to override each chain configuration before spawning its runtime. pub fn spawn_with( options: SpawnOptions, - config: &config::Reader, + config: &Config, src_chain_id: &ChainId, dst_chain_id: &ChainId, ) -> Result { @@ -44,7 +42,7 @@ impl ChainHandlePair { /// and return the pair of associated handles. fn spawn_chain_runtimes( spawn_options: SpawnOptions, - config: &config::Reader, + config: &Config, src_chain_id: &ChainId, dst_chain_id: &ChainId, ) -> Result { @@ -118,4 +116,4 @@ fn zip_result(a: Result, b: Result) -> Result<(A, B), E> { (Ok(a), Ok(b)) => Ok((a, b)), (Err(e), _) | (_, Err(e)) => Err(e), } -} +} \ No newline at end of file diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs new file mode 100644 index 0000000000..8ee5b891df --- /dev/null +++ b/relayer-cli/src/commands/start_multi.rs @@ -0,0 +1,335 @@ +#![allow(unused_imports, unreachable_code, dead_code, unused_variables)] + +use std::{collections::HashMap, thread::JoinHandle}; + +use abscissa_core::{Command, Options, Runnable}; +use crossbeam_channel::{Receiver, Sender}; +use prost_types::Any; + +use ibc::{ + events::IbcEvent, + ics04_channel::{ + channel::State as ChannelState, + events::{CloseInit, SendPacket, WriteAcknowledgement}, + msgs::recv_packet::MsgRecvPacket, + packet::{Packet, PacketMsgType}, + }, + ics24_host::identifier::{ChainId, ChannelId}, + tx_msg::Msg, + Height, +}; +use ibc_relayer::{config::Config, link::LinkError}; + +use crate::commands::cli_utils::ChainHandlePair; +use crate::conclude::Output; +use crate::prelude::*; + +#[derive(Clone, Command, Debug, Options)] +pub struct StartMultiCmd { + #[options(free, required, help = "identifier of the source chain")] + src_chain_id: ChainId, + + #[options(free, required, help = "identifier of the destination chain")] + dst_chain_id: ChainId, +} + +impl Runnable for StartMultiCmd { + fn run(&self) { + let config = (*app_config()).clone(); + + match start_multi(config, &self.src_chain_id, &self.dst_chain_id) { + Ok(output) => output.exit(), + Err(e) => Output::error(format!("{}", e)).exit(), + } + } +} + +struct WorkerCmd { + event: HandledEvent, + dst_height: Height, +} + +struct WorkerHandle { + pub tx: Sender, + pub thread_handle: JoinHandle<()>, +} + +impl WorkerHandle { + fn handle_packet_event(&self, event: HandledEvent, dst_height: Height) -> Result<(), BoxError> { + self.tx.send(WorkerCmd { event, dst_height })?; + Ok(()) + } +} + +struct Supervisor { + config: Config, + chains: ChainHandlePair, + workers: HashMap, +} + +impl Supervisor { + fn spawn( + config: Config, + src_chain_id: &ChainId, + dst_chain_id: &ChainId, + ) -> Result { + let chains = ChainHandlePair::spawn(&config, src_chain_id, dst_chain_id)?; + + Ok(Self { + config, + chains, + workers: HashMap::new(), + }) + } + + fn run(&mut self) -> Result<(), BoxError> { + let subscription = self.chains.src.subscribe()?; + + println!("iterating over event batches"); + + for batch in subscription.iter() { + let dst_height = self + .chains + .dst + .query_latest_height() + .map_err(|e| LinkError::QueryError(self.chains.dst.id(), e))?; + + let events = collect_events(&batch.events); + for event in events { + let object = event.object(); + let worker = self.worker_for_object(object); + worker.handle_packet_event(event, dst_height)?; + } + } + + Ok(()) + } + + fn worker_for_object(&mut self, object: Object) -> &WorkerHandle { + if self.workers.contains_key(&object) { + &self.workers[&object] + } else { + let worker = Worker::spawn(self.chains.clone(), object.clone()); + self.workers.entry(object).or_insert(worker) + } + } +} + +#[derive(Debug, Default)] +struct Msgs { + packets: Vec, + timeouts: Vec, + src_msgs_input_events: Vec, + dst_msgs_input_events: Vec, +} + +struct Worker { + chains: ChainHandlePair, + object: Object, + rx: Receiver, +} + +impl Worker { + fn spawn(chains: ChainHandlePair, object: Object) -> WorkerHandle { + let (tx, rx) = crossbeam_channel::unbounded(); + + let worker = Self { chains, object, rx }; + let thread_handle = std::thread::spawn(move || worker.run()); + + WorkerHandle { tx, thread_handle } + } + + fn run(self) { + while let Ok(cmd) = self.rx.recv() { + let msgs = handle_packet_event(&self.chains, cmd.event, cmd.dst_height); + + match msgs { + Ok(msgs) => { + println!("got messages after processing event: {:?}", msgs); + } + Err(e) => eprintln!( + "error when handling packet event for object '{:?}': {}", + self.object, e + ), + } + } + } +} + +fn handle_packet_event( + chains: &ChainHandlePair, + event: HandledEvent, + dst_height: Height, +) -> Result { + let mut msgs = Msgs::default(); + + let (dst_msg, timeout) = build_msg_from_event(&chains, &event, dst_height)?; + + if let Some(msg) = dst_msg { + msgs.packets.push(msg); + msgs.dst_msgs_input_events.push(event.clone()); + } + + if let Some(msg) = timeout { + msgs.timeouts.push(msg); + msgs.src_msgs_input_events.push(event); + } + + Ok(msgs) +} + +fn build_msg_from_event( + chains: &ChainHandlePair, + event: &HandledEvent, + dst_height: Height, +) -> Result<(Option, Option), BoxError> { + match event { + HandledEvent::SendPacket(send_packet) => { + info!("{} => event {}", chains.src.id(), send_packet); + build_recv_or_timeout_from_send_packet_event(chains, send_packet, dst_height) + } + } + + // HandledEvent::WriteAcknowledgement(write_ack_ev) => { + // info!("{} => event {}", chains.src.id(), write_ack_ev); + // Ok((Some(self.build_ack_from_recv_event(&write_ack_ev)?), None)) + // } + // HandledEvent::CloseInit(close_init_ev) => { + // info!("{} => event {}", chains.src.id(), close_init_ev); + // Ok(( + // Some(self.build_chan_close_confirm_from_close_init_event(&close_init_ev)?), + // None, + // )) + // } +} + +fn build_recv_or_timeout_from_send_packet_event( + chains: &ChainHandlePair, + event: &SendPacket, + dst_height: Height, +) -> Result<(Option, Option), BoxError> { + let packet = &event.packet; + + let dst_channel = chains + .dst + .query_channel( + &packet.destination_port, + &packet.destination_channel, + dst_height, + ) + .map_err(|e| LinkError::QueryError(chains.dst.id(), e))?; + + if dst_channel.state_matches(&ChannelState::Closed) { + eprintln!("no support for timeout_on_close yet"); + Ok((None, None)) + // Ok(( + // None, + // Some(self.build_timeout_on_close_packet(&event.packet, self.dst_height)?), + // )) + } else if !packet.timeout_height.is_zero() && packet.timeout_height < dst_height { + eprintln!("no support for timeout yet"); + Ok((None, None)) + // Ok(( + // None, + // Some(self.build_timeout_packet(&event.packet, self.dst_height)?), + // )) + } else { + let result = build_recv_packet(chains, &event.packet, event.height)?; + Ok((Some(result), None)) + } +} + +fn build_recv_packet( + chains: &ChainHandlePair, + packet: &Packet, + height: Height, +) -> Result { + // Get signer + let signer = chains.dst.get_signer().map_err(|e| { + LinkError::Failed(format!( + "could not retrieve signer from dst chain {} with error: {}", + chains.dst.id(), + e + )) + })?; + + let (_, proofs) = chains + .src + .build_packet_proofs( + PacketMsgType::Recv, + &packet.source_port, + &packet.source_channel, + packet.sequence, + height, + ) + .map_err(|e| LinkError::PacketProofsConstructor(chains.src.id(), e))?; + + let msg = MsgRecvPacket::new(packet.clone(), proofs.clone(), signer).map_err(|e| { + LinkError::Failed(format!( + "error while building the recv packet for src channel {} due to error {}", + packet.source_channel.clone(), + e + )) + })?; + + info!( + "built recv_packet msg {}, proofs at height {:?}", + msg.packet, + proofs.height() + ); + + use ibc_proto::ibc::core::channel::v1::MsgRecvPacket as RawMsgRecvPacket; + Ok(msg.to_any::()) +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct ChannelPair { + pub src: ChannelId, + pub dst: ChannelId, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +enum Object { + ChannelPair(ChannelPair), +} + +#[derive(Clone, Debug)] +enum HandledEvent { + SendPacket(SendPacket), + // WriteAcknowledgement(WriteAcknowledgement), + // CloseInit(CloseInit), +} + +impl HandledEvent { + fn object(&self) -> Object { + match self { + Self::SendPacket(e) => Object::ChannelPair(ChannelPair { + src: e.packet.source_channel.clone(), + dst: e.packet.destination_channel.clone(), + }), + } + } +} + +fn collect_events(events: &[IbcEvent]) -> Vec { + events + .iter() + .filter_map(|e| match e { + IbcEvent::SendPacket(e) => Some(HandledEvent::SendPacket(e.clone())), + // IbcEvent::WriteAcknowledgement(e) => Some(HandledEvent::WriteAcknowledgement(e)), + // IbcEvent::CloseInitChannel(e) => Some(HandledEvent::CloseInit(e)), + _ => None, + }) + .collect() +} + +fn start_multi( + config: Config, + src_chain_id: &ChainId, + dst_chain_id: &ChainId, +) -> Result { + let mut supervisor = Supervisor::spawn(config, src_chain_id, dst_chain_id)?; + supervisor.run()?; + + Ok(Output::success("ok")) +} diff --git a/relayer/src/link.rs b/relayer/src/link.rs index 233f1c7df6..4220f087fe 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -264,7 +264,7 @@ impl RelayPath { fn build_chan_close_confirm_from_event(&self, event: &IbcEvent) -> Result { let proofs = self .src_chain() - .build_channel_proofs(self.src_port_id(), self.src_channel_id(), *event.height()) + .build_channel_proofs(self.src_port_id(), self.src_channel_id(), event.height()) .map_err(|e| ChannelError::Failed(format!("failed to build channel proofs: {}", e)))?; // Build the domain type message @@ -337,7 +337,7 @@ impl RelayPath { if !self.all_events.is_empty() { // All events are at the same height - self.src_height = *self.all_events[0].height(); + self.src_height = self.all_events[0].height(); } } @@ -349,7 +349,8 @@ impl RelayPath { if self.all_events.is_empty() { return Ok(()); } - let event_height = &self.src_height; + + let event_height = self.src_height; // Check if a consensus state at event_height + 1 exists on destination chain already // and update src_height @@ -362,7 +363,7 @@ impl RelayPath { ) .is_ok() { - self.src_height = *event_height; + self.src_height = event_height; return Ok(()); } @@ -468,6 +469,7 @@ impl RelayPath { // Clear all_events and collect the src and dst input events if Tx-es fail self.all_events = vec![]; + if !self.packet_msgs.is_empty() { let update_height = self.src_height.increment(); let mut msgs_to_send = vec![]; From a092056e0135d9873d5597ce3f8e8b24d91f1cb1 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 23 Feb 2021 17:00:27 +0100 Subject: [PATCH 02/14] Adjust event heights --- relayer-cli/Cargo.toml | 1 + relayer-cli/src/commands/start.rs | 2 +- relayer-cli/src/commands/start_multi.rs | 187 +++++++++++++++++++----- relayer-cli/src/commands/tx/packet.rs | 4 +- relayer/src/link.rs | 7 +- relayer/src/relay.rs | 2 +- 6 files changed, 162 insertions(+), 41 deletions(-) diff --git a/relayer-cli/Cargo.toml b/relayer-cli/Cargo.toml index 75317c8408..76723f724a 100644 --- a/relayer-cli/Cargo.toml +++ b/relayer-cli/Cargo.toml @@ -45,6 +45,7 @@ hex = "0.4" crossbeam-channel = "0.5.0" subtle-encoding = "0.5" dirs-next = "2.0.0" +itertools = "0.10.0" [dependencies.tendermint-proto] version = "=0.18.1" diff --git a/relayer-cli/src/commands/start.rs b/relayer-cli/src/commands/start.rs index a1e302f9ed..9a4b4368ad 100644 --- a/relayer-cli/src/commands/start.rs +++ b/relayer-cli/src/commands/start.rs @@ -38,7 +38,7 @@ impl Runnable for StartCmd { match channel_relay( chains.src, chains.dst, - &LinkParameters { + LinkParameters { src_port_id: src_port_id.clone(), src_channel_id: src_channel_id.clone(), }, diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs index 8ee5b891df..1fda8c8f4a 100644 --- a/relayer-cli/src/commands/start_multi.rs +++ b/relayer-cli/src/commands/start_multi.rs @@ -4,6 +4,7 @@ use std::{collections::HashMap, thread::JoinHandle}; use abscissa_core::{Command, Options, Runnable}; use crossbeam_channel::{Receiver, Sender}; +use itertools::Itertools; use prost_types::Any; use ibc::{ @@ -14,11 +15,15 @@ use ibc::{ msgs::recv_packet::MsgRecvPacket, packet::{Packet, PacketMsgType}, }, - ics24_host::identifier::{ChainId, ChannelId}, + ics24_host::identifier::{ChainId, ChannelId, PortId}, tx_msg::Msg, Height, }; -use ibc_relayer::{config::Config, link::LinkError}; +use ibc_relayer::{ + channel::Channel, + config::Config, + link::{Link, LinkError, LinkParameters, RelayPath}, +}; use crate::commands::cli_utils::ChainHandlePair; use crate::conclude::Output; @@ -45,18 +50,30 @@ impl Runnable for StartMultiCmd { } struct WorkerCmd { - event: HandledEvent, + events: Vec, dst_height: Height, } +impl WorkerCmd { + fn new(events: Vec, dst_height: Height) -> Self { + assert!(!events.is_empty()); + Self { events, dst_height } + } +} + struct WorkerHandle { pub tx: Sender, pub thread_handle: JoinHandle<()>, } impl WorkerHandle { - fn handle_packet_event(&self, event: HandledEvent, dst_height: Height) -> Result<(), BoxError> { - self.tx.send(WorkerCmd { event, dst_height })?; + fn handle_packet_events( + &self, + events: Vec, + dst_height: Height, + ) -> Result<(), BoxError> { + assert!(!events.is_empty()); + self.tx.send(WorkerCmd::new(events, dst_height))?; Ok(()) } } @@ -95,10 +112,17 @@ impl Supervisor { .map_err(|e| LinkError::QueryError(self.chains.dst.id(), e))?; let events = collect_events(&batch.events); - for event in events { - let object = event.object(); + let events_per_object = events.into_iter().group_by(|e| e.object()); + + for (object, events) in events_per_object.into_iter() { + let events: Vec<_> = events.collect(); + if events.is_empty() { + println!("no events in batch"); + } + + assert!(!events.is_empty()); let worker = self.worker_for_object(object); - worker.handle_packet_event(event, dst_height)?; + worker.handle_packet_events(events, dst_height)?; } } @@ -125,7 +149,6 @@ struct Msgs { struct Worker { chains: ChainHandlePair, - object: Object, rx: Receiver, } @@ -133,46 +156,73 @@ impl Worker { fn spawn(chains: ChainHandlePair, object: Object) -> WorkerHandle { let (tx, rx) = crossbeam_channel::unbounded(); - let worker = Self { chains, object, rx }; - let thread_handle = std::thread::spawn(move || worker.run()); + let worker = Self { chains, rx }; + let thread_handle = std::thread::spawn(move || worker.run(object)); WorkerHandle { tx, thread_handle } } - fn run(self) { + fn run(self, object: Object) { + let result = match object { + Object::ChannelPair(channel_pair) => self.run_channel_pair(channel_pair), + }; + + if let Err(e) = result { + eprintln!("worker error: {}", e); + } + } + + fn run_channel_pair(self, channel_pair: ChannelPair) -> Result<(), BoxError> { + let link = Link::new_from_opts( + self.chains.src.clone(), + self.chains.dst.clone(), + LinkParameters { + src_port_id: channel_pair.src_port, + src_channel_id: channel_pair.src_channel, + }, + )?; + while let Ok(cmd) = self.rx.recv() { - let msgs = handle_packet_event(&self.chains, cmd.event, cmd.dst_height); + let WorkerCmd { + mut events, + dst_height, + } = cmd; + + assert!(!events.is_empty()); + + let src_height = + adjust_events_height(&self.chains, &link.a_to_b, events.as_mut_slice()); + let msgs = handle_packet_events(&self.chains, events, dst_height); match msgs { - Ok(msgs) => { - println!("got messages after processing event: {:?}", msgs); - } - Err(e) => eprintln!( - "error when handling packet event for object '{:?}': {}", - self.object, e - ), + Ok(msgs) => println!("got messages after processing event: {:?}", msgs), + Err(e) => eprintln!("error when handling packet event: {}", e), } } + + Ok(()) } } -fn handle_packet_event( +fn handle_packet_events( chains: &ChainHandlePair, - event: HandledEvent, + events: Vec, dst_height: Height, ) -> Result { let mut msgs = Msgs::default(); - let (dst_msg, timeout) = build_msg_from_event(&chains, &event, dst_height)?; + for event in events { + let (dst_msg, timeout) = build_msg_from_event(&chains, &event, dst_height)?; - if let Some(msg) = dst_msg { - msgs.packets.push(msg); - msgs.dst_msgs_input_events.push(event.clone()); - } + if let Some(msg) = dst_msg { + msgs.packets.push(msg); + msgs.dst_msgs_input_events.push(event.clone()); + } - if let Some(msg) = timeout { - msgs.timeouts.push(msg); - msgs.src_msgs_input_events.push(event); + if let Some(msg) = timeout { + msgs.timeouts.push(msg); + msgs.src_msgs_input_events.push(event); + } } Ok(msgs) @@ -282,10 +332,65 @@ fn build_recv_packet( Ok(msg.to_any::()) } +fn adjust_events_height( + chains: &ChainHandlePair, + relay_path: &RelayPath, + all_events: &mut [HandledEvent], +) -> Result, BoxError> { + if all_events.is_empty() { + return Ok(None); + } + + // All events are at the same height + let event_height = all_events[0].height(); + + // Check if a consensus state at event_height + 1 exists on destination chain already + // and update src_height + if relay_path + .dst_chain() + .proven_client_consensus( + relay_path.dst_client_id(), + event_height.increment(), + Height::zero(), + ) + .is_ok() + { + return Ok(Some(event_height)); + } + + // Get the latest trusted height from the client state on destination. + let trusted_height = relay_path + .dst_chain() + .query_client_state(relay_path.dst_client_id(), Height::zero()) + .map_err(|e| LinkError::QueryError(relay_path.dst_chain().id(), e))? + .latest_height(); + + // event_height + 1 is the height at which the client on destination chain + // should be updated, unless ... + if trusted_height > event_height.increment() { + // ... client is already at a higher height. + let src_height = trusted_height + .decrement() + .map_err(|e| LinkError::Failed(e.to_string()))?; + + println!("adjusting events height to {}", src_height); + + all_events + .iter_mut() + .for_each(|ev| ev.set_height(src_height)); + + Ok(Some(src_height)) + } else { + Ok(Some(event_height)) + } +} + #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct ChannelPair { - pub src: ChannelId, - pub dst: ChannelId, + pub src_channel: ChannelId, + pub dst_channel: ChannelId, + pub src_port: PortId, + pub dst_port: PortId, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -304,11 +409,25 @@ impl HandledEvent { fn object(&self) -> Object { match self { Self::SendPacket(e) => Object::ChannelPair(ChannelPair { - src: e.packet.source_channel.clone(), - dst: e.packet.destination_channel.clone(), + src_channel: e.packet.source_channel.clone(), + dst_channel: e.packet.destination_channel.clone(), + src_port: e.packet.source_port.clone(), + dst_port: e.packet.destination_port.clone(), }), } } + + fn height(&self) -> Height { + match self { + Self::SendPacket(e) => e.height(), + } + } + + fn set_height(&mut self, height: Height) { + match self { + Self::SendPacket(e) => e.set_height(height), + } + } } fn collect_events(events: &[IbcEvent]) -> Vec { diff --git a/relayer-cli/src/commands/tx/packet.rs b/relayer-cli/src/commands/tx/packet.rs index 765c8db1f7..02b03caa19 100644 --- a/relayer-cli/src/commands/tx/packet.rs +++ b/relayer-cli/src/commands/tx/packet.rs @@ -44,7 +44,7 @@ impl Runnable for TxRawPacketRecvCmd { src_port_id: self.src_port_id.clone(), src_channel_id: self.src_channel_id.clone(), }; - let mut link = match Link::new_from_opts(chains.src, chains.dst, &opts) { + let mut link = match Link::new_from_opts(chains.src, chains.dst, opts) { Ok(link) => link, Err(e) => return Output::error(format!("{}", e)).exit(), }; @@ -94,7 +94,7 @@ impl Runnable for TxRawPacketAckCmd { src_port_id: self.src_port_id.clone(), src_channel_id: self.src_channel_id.clone(), }; - let mut link = match Link::new_from_opts(chains.src, chains.dst, &opts) { + let mut link = match Link::new_from_opts(chains.src, chains.dst, opts) { Ok(link) => link, Err(e) => return Output::error(format!("{}", e)).exit(), }; diff --git a/relayer/src/link.rs b/relayer/src/link.rs index 4220f087fe..fe9b8cf374 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -884,10 +884,11 @@ impl Link { pub fn new(channel: Channel) -> Result { let a_chain = channel.src_chain(); let b_chain = channel.dst_chain(); + let flipped = channel.flipped(); Ok(Link { - a_to_b: RelayPath::new(a_chain.clone(), b_chain.clone(), channel.clone())?, - b_to_a: RelayPath::new(b_chain, a_chain, channel.flipped())?, + a_to_b: RelayPath::new(a_chain.clone(), b_chain.clone(), channel)?, + b_to_a: RelayPath::new(b_chain, a_chain, flipped)?, }) } @@ -953,7 +954,7 @@ impl Link { pub fn new_from_opts( a_chain: Box, b_chain: Box, - opts: &LinkParameters, + opts: LinkParameters, ) -> Result { // Check that the packet's channel on source chain is Open let a_channel_id = &opts.src_channel_id; diff --git a/relayer/src/relay.rs b/relayer/src/relay.rs index 6537d4c27e..41c2cae659 100644 --- a/relayer/src/relay.rs +++ b/relayer/src/relay.rs @@ -33,7 +33,7 @@ pub fn relay_on_new_link( pub fn channel_relay( a_chain: Box, b_chain: Box, - opts: &LinkParameters, + opts: LinkParameters, ) -> Result<(), BoxError> { let mut link = Link::new_from_opts(a_chain, b_chain, opts)?; Ok(link.relay()?) From 70441242dd86431bb68bd8d2be7fdb5b9bcc80b0 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Thu, 25 Feb 2021 20:50:38 +0100 Subject: [PATCH 03/14] Re-use existing Link infra for relaying packets --- Cargo.lock | 1 + relayer-cli/src/commands/start_multi.rs | 298 +++--------------------- relayer/src/link.rs | 124 +++++----- relayer/src/relay.rs | 2 +- 4 files changed, 101 insertions(+), 324 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3adb53e99f..b3c8adcf23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1214,6 +1214,7 @@ dependencies = [ "ibc", "ibc-proto", "ibc-relayer", + "itertools 0.10.0", "once_cell", "prost 0.7.0", "prost-types", diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs index 1fda8c8f4a..fb9e231d41 100644 --- a/relayer-cli/src/commands/start_multi.rs +++ b/relayer-cli/src/commands/start_multi.rs @@ -1,6 +1,6 @@ #![allow(unused_imports, unreachable_code, dead_code, unused_variables)] -use std::{collections::HashMap, thread::JoinHandle}; +use std::{collections::HashMap, sync::Arc, thread::JoinHandle}; use abscissa_core::{Command, Options, Runnable}; use crossbeam_channel::{Receiver, Sender}; @@ -22,6 +22,7 @@ use ibc::{ use ibc_relayer::{ channel::Channel, config::Config, + event::monitor::EventBatch, link::{Link, LinkError, LinkParameters, RelayPath}, }; @@ -50,14 +51,12 @@ impl Runnable for StartMultiCmd { } struct WorkerCmd { - events: Vec, - dst_height: Height, + batch: Arc, } impl WorkerCmd { - fn new(events: Vec, dst_height: Height) -> Self { - assert!(!events.is_empty()); - Self { events, dst_height } + fn new(batch: Arc) -> Self { + Self { batch } } } @@ -67,13 +66,8 @@ struct WorkerHandle { } impl WorkerHandle { - fn handle_packet_events( - &self, - events: Vec, - dst_height: Height, - ) -> Result<(), BoxError> { - assert!(!events.is_empty()); - self.tx.send(WorkerCmd::new(events, dst_height))?; + fn handle_packet_events(&self, batch: EventBatch) -> Result<(), BoxError> { + self.tx.send(WorkerCmd::new(Arc::new(batch)))?; Ok(()) } } @@ -105,24 +99,25 @@ impl Supervisor { println!("iterating over event batches"); for batch in subscription.iter() { - let dst_height = self - .chains - .dst - .query_latest_height() - .map_err(|e| LinkError::QueryError(self.chains.dst.id(), e))?; - + // FIXME(romac): Need to send NewBlock events to all workers let events = collect_events(&batch.events); - let events_per_object = events.into_iter().group_by(|e| e.object()); + let events_per_object = events.into_iter().group_by(ibc_event_object); for (object, events) in events_per_object.into_iter() { - let events: Vec<_> = events.collect(); + let events = events.collect::>(); if events.is_empty() { println!("no events in batch"); + continue; } - assert!(!events.is_empty()); + let worker_batch = EventBatch { + height: batch.height, + chain_id: batch.chain_id.clone(), + events, + }; + let worker = self.worker_for_object(object); - worker.handle_packet_events(events, dst_height)?; + worker.handle_packet_events(worker_batch)?; } } @@ -139,14 +134,6 @@ impl Supervisor { } } -#[derive(Debug, Default)] -struct Msgs { - packets: Vec, - timeouts: Vec, - src_msgs_input_events: Vec, - dst_msgs_input_events: Vec, -} - struct Worker { chains: ChainHandlePair, rx: Receiver, @@ -173,7 +160,7 @@ impl Worker { } fn run_channel_pair(self, channel_pair: ChannelPair) -> Result<(), BoxError> { - let link = Link::new_from_opts( + let mut link = Link::new_from_opts( self.chains.src.clone(), self.chains.dst.clone(), LinkParameters { @@ -183,208 +170,13 @@ impl Worker { )?; while let Ok(cmd) = self.rx.recv() { - let WorkerCmd { - mut events, - dst_height, - } = cmd; - - assert!(!events.is_empty()); - - let src_height = - adjust_events_height(&self.chains, &link.a_to_b, events.as_mut_slice()); - let msgs = handle_packet_events(&self.chains, events, dst_height); - - match msgs { - Ok(msgs) => println!("got messages after processing event: {:?}", msgs), - Err(e) => eprintln!("error when handling packet event: {}", e), - } + link.a_to_b.relay_from_events(cmd.batch)?; } Ok(()) } } -fn handle_packet_events( - chains: &ChainHandlePair, - events: Vec, - dst_height: Height, -) -> Result { - let mut msgs = Msgs::default(); - - for event in events { - let (dst_msg, timeout) = build_msg_from_event(&chains, &event, dst_height)?; - - if let Some(msg) = dst_msg { - msgs.packets.push(msg); - msgs.dst_msgs_input_events.push(event.clone()); - } - - if let Some(msg) = timeout { - msgs.timeouts.push(msg); - msgs.src_msgs_input_events.push(event); - } - } - - Ok(msgs) -} - -fn build_msg_from_event( - chains: &ChainHandlePair, - event: &HandledEvent, - dst_height: Height, -) -> Result<(Option, Option), BoxError> { - match event { - HandledEvent::SendPacket(send_packet) => { - info!("{} => event {}", chains.src.id(), send_packet); - build_recv_or_timeout_from_send_packet_event(chains, send_packet, dst_height) - } - } - - // HandledEvent::WriteAcknowledgement(write_ack_ev) => { - // info!("{} => event {}", chains.src.id(), write_ack_ev); - // Ok((Some(self.build_ack_from_recv_event(&write_ack_ev)?), None)) - // } - // HandledEvent::CloseInit(close_init_ev) => { - // info!("{} => event {}", chains.src.id(), close_init_ev); - // Ok(( - // Some(self.build_chan_close_confirm_from_close_init_event(&close_init_ev)?), - // None, - // )) - // } -} - -fn build_recv_or_timeout_from_send_packet_event( - chains: &ChainHandlePair, - event: &SendPacket, - dst_height: Height, -) -> Result<(Option, Option), BoxError> { - let packet = &event.packet; - - let dst_channel = chains - .dst - .query_channel( - &packet.destination_port, - &packet.destination_channel, - dst_height, - ) - .map_err(|e| LinkError::QueryError(chains.dst.id(), e))?; - - if dst_channel.state_matches(&ChannelState::Closed) { - eprintln!("no support for timeout_on_close yet"); - Ok((None, None)) - // Ok(( - // None, - // Some(self.build_timeout_on_close_packet(&event.packet, self.dst_height)?), - // )) - } else if !packet.timeout_height.is_zero() && packet.timeout_height < dst_height { - eprintln!("no support for timeout yet"); - Ok((None, None)) - // Ok(( - // None, - // Some(self.build_timeout_packet(&event.packet, self.dst_height)?), - // )) - } else { - let result = build_recv_packet(chains, &event.packet, event.height)?; - Ok((Some(result), None)) - } -} - -fn build_recv_packet( - chains: &ChainHandlePair, - packet: &Packet, - height: Height, -) -> Result { - // Get signer - let signer = chains.dst.get_signer().map_err(|e| { - LinkError::Failed(format!( - "could not retrieve signer from dst chain {} with error: {}", - chains.dst.id(), - e - )) - })?; - - let (_, proofs) = chains - .src - .build_packet_proofs( - PacketMsgType::Recv, - &packet.source_port, - &packet.source_channel, - packet.sequence, - height, - ) - .map_err(|e| LinkError::PacketProofsConstructor(chains.src.id(), e))?; - - let msg = MsgRecvPacket::new(packet.clone(), proofs.clone(), signer).map_err(|e| { - LinkError::Failed(format!( - "error while building the recv packet for src channel {} due to error {}", - packet.source_channel.clone(), - e - )) - })?; - - info!( - "built recv_packet msg {}, proofs at height {:?}", - msg.packet, - proofs.height() - ); - - use ibc_proto::ibc::core::channel::v1::MsgRecvPacket as RawMsgRecvPacket; - Ok(msg.to_any::()) -} - -fn adjust_events_height( - chains: &ChainHandlePair, - relay_path: &RelayPath, - all_events: &mut [HandledEvent], -) -> Result, BoxError> { - if all_events.is_empty() { - return Ok(None); - } - - // All events are at the same height - let event_height = all_events[0].height(); - - // Check if a consensus state at event_height + 1 exists on destination chain already - // and update src_height - if relay_path - .dst_chain() - .proven_client_consensus( - relay_path.dst_client_id(), - event_height.increment(), - Height::zero(), - ) - .is_ok() - { - return Ok(Some(event_height)); - } - - // Get the latest trusted height from the client state on destination. - let trusted_height = relay_path - .dst_chain() - .query_client_state(relay_path.dst_client_id(), Height::zero()) - .map_err(|e| LinkError::QueryError(relay_path.dst_chain().id(), e))? - .latest_height(); - - // event_height + 1 is the height at which the client on destination chain - // should be updated, unless ... - if trusted_height > event_height.increment() { - // ... client is already at a higher height. - let src_height = trusted_height - .decrement() - .map_err(|e| LinkError::Failed(e.to_string()))?; - - println!("adjusting events height to {}", src_height); - - all_events - .iter_mut() - .for_each(|ev| ev.set_height(src_height)); - - Ok(Some(src_height)) - } else { - Ok(Some(event_height)) - } -} - #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct ChannelPair { pub src_channel: ChannelId, @@ -398,47 +190,25 @@ enum Object { ChannelPair(ChannelPair), } -#[derive(Clone, Debug)] -enum HandledEvent { - SendPacket(SendPacket), - // WriteAcknowledgement(WriteAcknowledgement), - // CloseInit(CloseInit), -} - -impl HandledEvent { - fn object(&self) -> Object { - match self { - Self::SendPacket(e) => Object::ChannelPair(ChannelPair { - src_channel: e.packet.source_channel.clone(), - dst_channel: e.packet.destination_channel.clone(), - src_port: e.packet.source_port.clone(), - dst_port: e.packet.destination_port.clone(), - }), - } - } - - fn height(&self) -> Height { - match self { - Self::SendPacket(e) => e.height(), - } - } - - fn set_height(&mut self, height: Height) { - match self { - Self::SendPacket(e) => e.set_height(height), - } +fn ibc_event_object(event: &IbcEvent) -> Object { + match event { + IbcEvent::SendPacket(e) => Object::ChannelPair(ChannelPair { + src_channel: e.packet.source_channel.clone(), + dst_channel: e.packet.destination_channel.clone(), + src_port: e.packet.source_port.clone(), + dst_port: e.packet.destination_port.clone(), + }), + _ => unreachable!(), } } -fn collect_events(events: &[IbcEvent]) -> Vec { +fn collect_events(events: &[IbcEvent]) -> Vec { events .iter() - .filter_map(|e| match e { - IbcEvent::SendPacket(e) => Some(HandledEvent::SendPacket(e.clone())), - // IbcEvent::WriteAcknowledgement(e) => Some(HandledEvent::WriteAcknowledgement(e)), - // IbcEvent::CloseInitChannel(e) => Some(HandledEvent::CloseInit(e)), - _ => None, - }) + .filter(|e| matches!(e, IbcEvent::SendPacket(_))) + // IbcEvent::WriteAcknowledgement(e) => Some(HandledEvent::WriteAcknowledgement(e)), + // IbcEvent::CloseInitChannel(e) => Some(HandledEvent::CloseInit(e)), + .cloned() .collect() } diff --git a/relayer/src/link.rs b/relayer/src/link.rs index fe9b8cf374..3cd95310b2 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -1,5 +1,5 @@ -use std::thread; use std::time::Duration; +use std::{sync::Arc, thread}; use prost_types::Any; use tendermint::account::Id; @@ -31,10 +31,11 @@ use ibc_proto::ibc::core::channel::v1::{ QueryUnreceivedPacketsRequest, }; -use crate::chain::handle::{ChainHandle, Subscription}; +use crate::chain::handle::ChainHandle; use crate::channel::{Channel, ChannelError, ChannelSide}; use crate::connection::ConnectionError; use crate::error::Error; +use crate::event::monitor::EventBatch; use crate::foreign_client::{ForeignClient, ForeignClientError}; use crate::relay::MAX_ITER; @@ -71,7 +72,6 @@ pub enum LinkError { pub struct RelayPath { src_chain: Box, dst_chain: Box, - subscription: Subscription, channel: Channel, clear_packets: bool, all_events: Vec, @@ -88,11 +88,10 @@ impl RelayPath { src_chain: Box, dst_chain: Box, channel: Channel, - ) -> Result { - Ok(RelayPath { - src_chain: src_chain.clone(), - dst_chain: dst_chain.clone(), - subscription: src_chain.subscribe()?, + ) -> Self { + Self { + src_chain, + dst_chain, channel, clear_packets: true, all_events: vec![], @@ -102,7 +101,7 @@ impl RelayPath { src_msgs_input_events: vec![], packet_msgs: vec![], timeout_msgs: vec![], - }) + } } pub fn src_chain(&self) -> Box { @@ -363,7 +362,6 @@ impl RelayPath { ) .is_ok() { - self.src_height = event_height; return Ok(()); } @@ -413,53 +411,52 @@ impl RelayPath { Err(LinkError::OldPacketClearingFailed) } - fn relay_from_events(&mut self) -> Result<(), LinkError> { - // Iterate through the IBC Events, build the message for each and collect all at same height. - // Send a multi message transaction with these, prepending the client update - - for batch in self.subscription.try_iter().collect::>().iter() { - if self.clear_packets { - let first_event_height = batch.events[0].height(); - self.src_height = first_event_height - .decrement() - .map_err(|e| LinkError::Failed(e.to_string()))?; - self.relay_pending_packets()?; - self.clear_packets = false; - } + /// Iterate through the IBC Events, build the message for each and collect all at same height. + /// Send a multi message transaction with these, prepending the client update + pub fn relay_from_events(&mut self, batch: Arc) -> Result<(), LinkError> { + if self.clear_packets { + self.src_height = batch + .height + .decrement() + .map_err(|e| LinkError::Failed(e.to_string()))?; - // collect relevant events in self.all_events - self.collect_events(&batch.events); - self.adjust_events_height()?; + self.relay_pending_packets()?; + self.clear_packets = false; + } - if self.all_events.is_empty() { - continue; - } + // collect relevant events in self.all_events + self.collect_events(&batch.events); + self.adjust_events_height()?; - for _i in 0..MAX_ITER { - self.reset_buffers(); + if self.all_events.is_empty() { + return Ok(()); + } - self.dst_height = self.dst_latest_height()?; + for _i in 0..MAX_ITER { + self.reset_buffers(); - // Collect the messages for all events - for event in self.all_events.clone() { - println!("{} => {:?}", self.src_chain.id(), event); - self.handle_packet_event(&event)?; - } + self.dst_height = self.dst_latest_height()?; - // Send client update and messages - let res = self.send_update_client_and_msgs(); - println!("\nresult {:?}", res); + // Collect the messages for all events + for event in self.all_events.clone() { + println!("{} => {:?}", self.src_chain.id(), event); + self.handle_packet_event(&event)?; + } - if self.all_events.is_empty() { - break; - } + // Send client update and messages + let res = self.send_update_client_and_msgs(); + println!("\nresult {:?}", res); - println!("retrying"); + if self.all_events.is_empty() { + break; } - // TODO - add error - self.all_events = vec![]; + println!("retrying"); } + + // TODO - add error + self.all_events = vec![]; + Ok(()) } @@ -869,40 +866,48 @@ impl RelayPath { } } -pub struct Link { - pub a_to_b: RelayPath, - pub b_to_a: RelayPath, -} - #[derive(Clone, Debug)] pub struct LinkParameters { pub src_port_id: PortId, pub src_channel_id: ChannelId, } +pub struct Link { + pub a_to_b: RelayPath, + pub b_to_a: RelayPath, +} + impl Link { - pub fn new(channel: Channel) -> Result { + pub fn new(channel: Channel) -> Self { let a_chain = channel.src_chain(); let b_chain = channel.dst_chain(); let flipped = channel.flipped(); - Ok(Link { - a_to_b: RelayPath::new(a_chain.clone(), b_chain.clone(), channel)?, - b_to_a: RelayPath::new(b_chain, a_chain, flipped)?, - }) + Self { + a_to_b: RelayPath::new(a_chain.clone(), b_chain.clone(), channel), + b_to_a: RelayPath::new(b_chain, a_chain, flipped), + } } pub fn relay(&mut self) -> Result<(), LinkError> { println!("relaying packets on {:#?}", self.a_to_b.channel); + let events_a = self.a_to_b.src_chain().subscribe()?; + let events_b = self.b_to_a.src_chain().subscribe()?; + loop { if self.is_closed()? { println!("channel is closed, exiting"); return Ok(()); } - self.a_to_b.relay_from_events()?; - self.b_to_a.relay_from_events()?; + if let Ok(events) = events_a.try_recv() { + self.a_to_b.relay_from_events(events)?; + } + + if let Ok(events) = events_b.try_recv() { + self.b_to_a.relay_from_events(events)?; + } // TODO - select over the two subscriptions thread::sleep(Duration::from_millis(100)) @@ -1022,7 +1027,8 @@ impl Link { b_channel_id, ), }; - Link::new(channel) + + Ok(Link::new(channel)) } pub fn build_and_send_recv_packet_messages(&mut self) -> Result, LinkError> { diff --git a/relayer/src/relay.rs b/relayer/src/relay.rs index 41c2cae659..902b01be76 100644 --- a/relayer/src/relay.rs +++ b/relayer/src/relay.rs @@ -22,7 +22,7 @@ pub fn relay_on_new_link( // Setup the clients, connection and channel let channel = connect_with_new_channel(a_chain_handle, b_chain_handle, ordering, path)?; - let mut link = Link::new(channel)?; + let mut link = Link::new(channel); link.relay()?; Ok(()) From f8e2f34f3f570bb4701ca0629372c4806dbd4b3c Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 26 Feb 2021 10:05:54 +0100 Subject: [PATCH 04/14] Formatting --- relayer-cli/src/commands/cli_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer-cli/src/commands/cli_utils.rs b/relayer-cli/src/commands/cli_utils.rs index 222524ede3..976298c0d4 100644 --- a/relayer-cli/src/commands/cli_utils.rs +++ b/relayer-cli/src/commands/cli_utils.rs @@ -116,4 +116,4 @@ fn zip_result(a: Result, b: Result) -> Result<(A, B), E> { (Ok(a), Ok(b)) => Ok((a, b)), (Err(e), _) | (_, Err(e)) => Err(e), } -} \ No newline at end of file +} From 59a7666106dd4b6005b84a87cbb8a6b5d69bab07 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 26 Feb 2021 16:19:41 +0100 Subject: [PATCH 05/14] Use only source chain, channel and port to identify packets --- modules/src/ics04_channel/events.rs | 14 +++- relayer-cli/src/commands/start_multi.rs | 98 +++++++++++++++++-------- relayer/src/link.rs | 2 +- 3 files changed, 80 insertions(+), 34 deletions(-) diff --git a/modules/src/ics04_channel/events.rs b/modules/src/ics04_channel/events.rs index a646ed535c..971708057a 100644 --- a/modules/src/ics04_channel/events.rs +++ b/modules/src/ics04_channel/events.rs @@ -357,8 +357,18 @@ impl CloseInit { pub fn port_id(&self) -> &PortId { &self.0.port_id } - pub fn channel_id(&self) -> &Option { - &self.0.channel_id + pub fn channel_id(&self) -> &ChannelId { + // FIXME(romac): Rework encoding of IbcEvents which use `Attributes` + self.0 + .channel_id + .as_ref() + .expect("CloseInit should always have a channel_id") + } + pub fn counterparty_port_id(&self) -> &PortId { + &self.0.counterparty_port_id + } + pub fn counterparty_channel_id(&self) -> Option<&ChannelId> { + self.0.counterparty_channel_id.as_ref() } pub fn height(&self) -> Height { self.0.height diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs index fb9e231d41..965dee0dd1 100644 --- a/relayer-cli/src/commands/start_multi.rs +++ b/relayer-cli/src/commands/start_multi.rs @@ -11,7 +11,7 @@ use ibc::{ events::IbcEvent, ics04_channel::{ channel::State as ChannelState, - events::{CloseInit, SendPacket, WriteAcknowledgement}, + events::{CloseInit, SendPacket, TimeoutPacket, WriteAcknowledgement}, msgs::recv_packet::MsgRecvPacket, packet::{Packet, PacketMsgType}, }, @@ -99,12 +99,12 @@ impl Supervisor { println!("iterating over event batches"); for batch in subscription.iter() { - // FIXME(romac): Need to send NewBlock events to all workers - let events = collect_events(&batch.events); - let events_per_object = events.into_iter().group_by(ibc_event_object); + // TODO(romac): Need to send NewBlock events to all workers + + let events = collect_events(&batch.events, self.chains.src.id()); + let events_per_object = events.into_iter().into_group_map(); for (object, events) in events_per_object.into_iter() { - let events = events.collect::>(); if events.is_empty() { println!("no events in batch"); continue; @@ -151,7 +151,7 @@ impl Worker { fn run(self, object: Object) { let result = match object { - Object::ChannelPair(channel_pair) => self.run_channel_pair(channel_pair), + Object::UnidirectionalChannelPath(path) => self.run_uni_chan_path(path), }; if let Err(e) = result { @@ -159,13 +159,14 @@ impl Worker { } } - fn run_channel_pair(self, channel_pair: ChannelPair) -> Result<(), BoxError> { + fn run_uni_chan_path(self, path: UnidirectionalChannelPath) -> Result<(), BoxError> { + println!("running worker for object {:?}", path); let mut link = Link::new_from_opts( self.chains.src.clone(), self.chains.dst.clone(), LinkParameters { - src_port_id: channel_pair.src_port, - src_channel_id: channel_pair.src_channel, + src_port_id: path.src_port_id, + src_channel_id: path.src_channel_id, }, )?; @@ -178,37 +179,72 @@ impl Worker { } #[derive(Clone, Debug, PartialEq, Eq, Hash)] -struct ChannelPair { - pub src_channel: ChannelId, - pub dst_channel: ChannelId, - pub src_port: PortId, - pub dst_port: PortId, +struct UnidirectionalChannelPath { + pub src_chain_id: ChainId, + pub src_channel_id: ChannelId, + pub src_port_id: PortId, +} + +impl UnidirectionalChannelPath { + fn for_packet(p: &Packet, chain_id: &ChainId) -> Self { + Self { + src_chain_id: chain_id.clone(), + src_channel_id: p.source_channel.clone(), + src_port_id: p.source_port.clone(), + } + } } #[derive(Clone, Debug, PartialEq, Eq, Hash)] enum Object { - ChannelPair(ChannelPair), -} - -fn ibc_event_object(event: &IbcEvent) -> Object { - match event { - IbcEvent::SendPacket(e) => Object::ChannelPair(ChannelPair { - src_channel: e.packet.source_channel.clone(), - dst_channel: e.packet.destination_channel.clone(), - src_port: e.packet.source_port.clone(), - dst_port: e.packet.destination_port.clone(), - }), - _ => unreachable!(), + UnidirectionalChannelPath(UnidirectionalChannelPath), +} + +impl From for Object { + fn from(p: UnidirectionalChannelPath) -> Self { + Self::UnidirectionalChannelPath(p) } } -fn collect_events(events: &[IbcEvent]) -> Vec { +impl Object { + fn for_send_packet(e: &SendPacket, chain_id: &ChainId) -> Self { + UnidirectionalChannelPath::for_packet(&e.packet, chain_id).into() + } + + fn for_write_ack(e: &WriteAcknowledgement, chain_id: &ChainId) -> Self { + UnidirectionalChannelPath::for_packet(&e.packet, chain_id).into() + } + + fn for_timeout_packet(e: &TimeoutPacket, chain_id: &ChainId) -> Self { + UnidirectionalChannelPath::for_packet(&e.packet, chain_id).into() + } + + fn for_close_init_channel(e: &CloseInit, chain_id: &ChainId) -> Self { + UnidirectionalChannelPath { + src_chain_id: chain_id.clone(), + src_channel_id: e.channel_id().clone(), + src_port_id: e.port_id().clone(), + } + .into() + } +} + +fn collect_events(events: &[IbcEvent], chain_id: ChainId) -> Vec<(Object, IbcEvent)> { events .iter() - .filter(|e| matches!(e, IbcEvent::SendPacket(_))) - // IbcEvent::WriteAcknowledgement(e) => Some(HandledEvent::WriteAcknowledgement(e)), - // IbcEvent::CloseInitChannel(e) => Some(HandledEvent::CloseInit(e)), - .cloned() + .filter_map(|e| match e { + IbcEvent::SendPacket(p) => Some((Object::for_send_packet(p, &chain_id), e.clone())), + IbcEvent::TimeoutPacket(p) => { + Some((Object::for_timeout_packet(p, &chain_id), e.clone())) + } + IbcEvent::WriteAcknowledgement(p) => { + Some((Object::for_write_ack(p, &chain_id), e.clone())) + } + IbcEvent::CloseInitChannel(p) => { + Some((Object::for_close_init_channel(p, &chain_id), e.clone())) + } + _ => None, + }) .collect() } diff --git a/relayer/src/link.rs b/relayer/src/link.rs index 3cd95310b2..c7e92640b9 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -317,7 +317,7 @@ impl RelayPath { } } IbcEvent::CloseInitChannel(chan_close_ev) => { - if Some(self.channel.src_channel_id()) == chan_close_ev.channel_id().as_ref() + if self.channel.src_channel_id() == chan_close_ev.channel_id() && self.channel.src_port_id() == chan_close_ev.port_id() { self.all_events.push(event.clone()); From b197537907f2137db83a52b98a7f888ff501d823 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 26 Feb 2021 16:49:47 +0100 Subject: [PATCH 06/14] Force use of in-memory store --- relayer/src/light_client/tendermint.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/relayer/src/light_client/tendermint.rs b/relayer/src/light_client/tendermint.rs index 5be75fb314..4d600719e5 100644 --- a/relayer/src/light_client/tendermint.rs +++ b/relayer/src/light_client/tendermint.rs @@ -8,7 +8,7 @@ use tendermint_rpc as rpc; use crate::{ chain::CosmosSdkChain, - config::{ChainConfig, LightClientConfig, StoreConfig}, + config::{ChainConfig, LightClientConfig}, error, }; use ibc::ics24_host::identifier::ChainId; @@ -82,15 +82,17 @@ fn build_instance( let rpc_client = rpc::HttpClient::new(config.address.clone()) .map_err(|e| error::Kind::LightClientInstance(config.address.to_string()).context(e))?; - let store: Box = match &config.store { - StoreConfig::Disk { path } => { - let db = sled::open(path).map_err(|e| { - error::Kind::LightClientInstance(config.address.to_string()).context(e) - })?; - Box::new(store::sled::SledStore::new(db)) - } - StoreConfig::Memory { .. } => Box::new(store::memory::MemoryStore::new()), - }; + // let store: Box = match &config.store { + // StoreConfig::Disk { path } => { + // let db = sled::open(path).map_err(|e| { + // error::Kind::LightClientInstance(config.address.to_string()).context(e) + // })?; + // Box::new(store::sled::SledStore::new(db)) + // } + // StoreConfig::Memory { .. } => Box::new(store::memory::MemoryStore::new()), + // }; + + let store = Box::new(store::memory::MemoryStore::new()); let builder = LightClientBuilder::prod( config.peer_id, From 675a33e8fdbe17195db18dd069bb16890ba12019 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 26 Feb 2021 16:50:58 +0100 Subject: [PATCH 07/14] Fix computation of UnidrectionalChannelPath for WriteAck --- relayer-cli/src/commands/start_multi.rs | 31 ++++++++++++++----------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs index 965dee0dd1..fd4ec43581 100644 --- a/relayer-cli/src/commands/start_multi.rs +++ b/relayer-cli/src/commands/start_multi.rs @@ -185,16 +185,6 @@ struct UnidirectionalChannelPath { pub src_port_id: PortId, } -impl UnidirectionalChannelPath { - fn for_packet(p: &Packet, chain_id: &ChainId) -> Self { - Self { - src_chain_id: chain_id.clone(), - src_channel_id: p.source_channel.clone(), - src_port_id: p.source_port.clone(), - } - } -} - #[derive(Clone, Debug, PartialEq, Eq, Hash)] enum Object { UnidirectionalChannelPath(UnidirectionalChannelPath), @@ -208,15 +198,30 @@ impl From for Object { impl Object { fn for_send_packet(e: &SendPacket, chain_id: &ChainId) -> Self { - UnidirectionalChannelPath::for_packet(&e.packet, chain_id).into() + UnidirectionalChannelPath { + src_chain_id: chain_id.clone(), + src_channel_id: e.packet.source_channel.clone(), + src_port_id: e.packet.source_port.clone(), + } + .into() } fn for_write_ack(e: &WriteAcknowledgement, chain_id: &ChainId) -> Self { - UnidirectionalChannelPath::for_packet(&e.packet, chain_id).into() + UnidirectionalChannelPath { + src_chain_id: chain_id.clone(), + src_channel_id: e.packet.destination_channel.clone(), + src_port_id: e.packet.destination_port.clone(), + } + .into() } fn for_timeout_packet(e: &TimeoutPacket, chain_id: &ChainId) -> Self { - UnidirectionalChannelPath::for_packet(&e.packet, chain_id).into() + UnidirectionalChannelPath { + src_chain_id: chain_id.clone(), + src_channel_id: e.src_channel_id().clone(), + src_port_id: e.src_port_id().clone(), + } + .into() } fn for_close_init_channel(e: &CloseInit, chain_id: &ChainId) -> Self { From 44aa3a673195c2f19d01f96dcd7bd4cf104c01d4 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 26 Feb 2021 16:51:21 +0100 Subject: [PATCH 08/14] Run two supervisors in parallel --- relayer-cli/src/commands/start_multi.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs index fd4ec43581..f5ff819c09 100644 --- a/relayer-cli/src/commands/start_multi.rs +++ b/relayer-cli/src/commands/start_multi.rs @@ -258,7 +258,10 @@ fn start_multi( src_chain_id: &ChainId, dst_chain_id: &ChainId, ) -> Result { - let mut supervisor = Supervisor::spawn(config, src_chain_id, dst_chain_id)?; + let mut src_to_dst = Supervisor::spawn(config.clone(), src_chain_id, dst_chain_id)?; + std::thread::spawn(move || src_to_dst.run()); + + let mut supervisor = Supervisor::spawn(config, dst_chain_id, src_chain_id)?; supervisor.run()?; Ok(Output::success("ok")) From 72f54581ad97998ea6be2ee6543e26f96beeb2fc Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 26 Feb 2021 17:14:34 +0100 Subject: [PATCH 09/14] Relay in both directions with a single supervisor --- relayer-cli/src/commands/cli_utils.rs | 8 +++ relayer-cli/src/commands/start_multi.rs | 82 ++++++++++++++----------- 2 files changed, 54 insertions(+), 36 deletions(-) diff --git a/relayer-cli/src/commands/cli_utils.rs b/relayer-cli/src/commands/cli_utils.rs index 976298c0d4..751024a367 100644 --- a/relayer-cli/src/commands/cli_utils.rs +++ b/relayer-cli/src/commands/cli_utils.rs @@ -36,6 +36,14 @@ impl ChainHandlePair { ) -> Result { spawn_chain_runtimes(options, config, src_chain_id, dst_chain_id) } + + /// Swap the two handles to that `dst` becomes `src` and `src` becomes `dst` + pub fn swap(self) -> Self { + Self { + src: self.dst, + dst: self.src, + } + } } /// Spawn the source and destination chain runtime from the configuration and chain identifiers, diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs index f5ff819c09..cbd6e476c9 100644 --- a/relayer-cli/src/commands/start_multi.rs +++ b/relayer-cli/src/commands/start_multi.rs @@ -1,6 +1,6 @@ #![allow(unused_imports, unreachable_code, dead_code, unused_variables)] -use std::{collections::HashMap, sync::Arc, thread::JoinHandle}; +use std::{collections::HashMap, sync::Arc, thread::JoinHandle, time::Duration}; use abscissa_core::{Command, Options, Runnable}; use crossbeam_channel::{Receiver, Sender}; @@ -20,6 +20,7 @@ use ibc::{ Height, }; use ibc_relayer::{ + chain::handle::ChainHandle, channel::Channel, config::Config, event::monitor::EventBatch, @@ -79,12 +80,8 @@ struct Supervisor { } impl Supervisor { - fn spawn( - config: Config, - src_chain_id: &ChainId, - dst_chain_id: &ChainId, - ) -> Result { - let chains = ChainHandlePair::spawn(&config, src_chain_id, dst_chain_id)?; + fn spawn(config: Config, chain_a: &ChainId, chain_b: &ChainId) -> Result { + let chains = ChainHandlePair::spawn(&config, chain_a, chain_b)?; Ok(Self { config, @@ -94,41 +91,61 @@ impl Supervisor { } fn run(&mut self) -> Result<(), BoxError> { - let subscription = self.chains.src.subscribe()?; + let subscription_a = self.chains.src.subscribe()?; + let subscription_b = self.chains.dst.subscribe()?; - println!("iterating over event batches"); + loop { + println!("{} => iterating over event batches", self.chains.src.id()); + for batch in subscription_a.try_iter() { + self.process_batch(batch)?; + } - for batch in subscription.iter() { - // TODO(romac): Need to send NewBlock events to all workers + println!("{} => iterating over event batches", self.chains.dst.id()); + for batch in subscription_b.try_iter() { + self.process_batch(batch)?; + } - let events = collect_events(&batch.events, self.chains.src.id()); - let events_per_object = events.into_iter().into_group_map(); + std::thread::sleep(Duration::from_millis(600)); + } + } - for (object, events) in events_per_object.into_iter() { - if events.is_empty() { - println!("no events in batch"); - continue; - } + fn process_batch(&mut self, batch: Arc) -> Result<(), BoxError> { + // TODO(romac): Need to send NewBlock events to all workers - let worker_batch = EventBatch { - height: batch.height, - chain_id: batch.chain_id.clone(), - events, - }; + let events = collect_events(&batch.events, batch.chain_id.clone()); + let events_per_object = events.into_iter().into_group_map(); - let worker = self.worker_for_object(object); - worker.handle_packet_events(worker_batch)?; + for (object, events) in events_per_object.into_iter() { + if events.is_empty() { + println!("no events in batch"); + return Ok(()); } + + let worker_batch = EventBatch { + height: batch.height, + chain_id: batch.chain_id.clone(), + events, + }; + + let is_dest = batch.chain_id == self.chains.dst.id(); + let worker = self.worker_for_object(object, is_dest); + worker.handle_packet_events(worker_batch)?; } Ok(()) } - fn worker_for_object(&mut self, object: Object) -> &WorkerHandle { + fn worker_for_object(&mut self, object: Object, swap: bool) -> &WorkerHandle { if self.workers.contains_key(&object) { &self.workers[&object] } else { - let worker = Worker::spawn(self.chains.clone(), object.clone()); + let chains = if swap { + self.chains.clone().swap() + } else { + self.chains.clone() + }; + + let worker = Worker::spawn(chains, object.clone()); self.workers.entry(object).or_insert(worker) } } @@ -253,15 +270,8 @@ fn collect_events(events: &[IbcEvent], chain_id: ChainId) -> Vec<(Object, IbcEve .collect() } -fn start_multi( - config: Config, - src_chain_id: &ChainId, - dst_chain_id: &ChainId, -) -> Result { - let mut src_to_dst = Supervisor::spawn(config.clone(), src_chain_id, dst_chain_id)?; - std::thread::spawn(move || src_to_dst.run()); - - let mut supervisor = Supervisor::spawn(config, dst_chain_id, src_chain_id)?; +fn start_multi(config: Config, chain_a: &ChainId, chain_b: &ChainId) -> Result { + let mut supervisor = Supervisor::spawn(config, chain_a, chain_b)?; supervisor.run()?; Ok(Output::success("ok")) From d736974add87d51139a55ad6e609f9f5e424836f Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 26 Feb 2021 17:14:43 +0100 Subject: [PATCH 10/14] Revert "Force use of in-memory store" This reverts commit b197537907f2137db83a52b98a7f888ff501d823. --- relayer/src/light_client/tendermint.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/relayer/src/light_client/tendermint.rs b/relayer/src/light_client/tendermint.rs index 4d600719e5..5be75fb314 100644 --- a/relayer/src/light_client/tendermint.rs +++ b/relayer/src/light_client/tendermint.rs @@ -8,7 +8,7 @@ use tendermint_rpc as rpc; use crate::{ chain::CosmosSdkChain, - config::{ChainConfig, LightClientConfig}, + config::{ChainConfig, LightClientConfig, StoreConfig}, error, }; use ibc::ics24_host::identifier::ChainId; @@ -82,17 +82,15 @@ fn build_instance( let rpc_client = rpc::HttpClient::new(config.address.clone()) .map_err(|e| error::Kind::LightClientInstance(config.address.to_string()).context(e))?; - // let store: Box = match &config.store { - // StoreConfig::Disk { path } => { - // let db = sled::open(path).map_err(|e| { - // error::Kind::LightClientInstance(config.address.to_string()).context(e) - // })?; - // Box::new(store::sled::SledStore::new(db)) - // } - // StoreConfig::Memory { .. } => Box::new(store::memory::MemoryStore::new()), - // }; - - let store = Box::new(store::memory::MemoryStore::new()); + let store: Box = match &config.store { + StoreConfig::Disk { path } => { + let db = sled::open(path).map_err(|e| { + error::Kind::LightClientInstance(config.address.to_string()).context(e) + })?; + Box::new(store::sled::SledStore::new(db)) + } + StoreConfig::Memory { .. } => Box::new(store::memory::MemoryStore::new()), + }; let builder = LightClientBuilder::prod( config.peer_id, From 18a2915783c67cfa10eb855ff70696c3920ccbec Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 2 Mar 2021 11:25:14 +0100 Subject: [PATCH 11/14] Move supervisor into ibc-relayer crate --- relayer-cli/src/commands/cli_utils.rs | 8 - relayer-cli/src/commands/start_multi.rs | 250 +---------------------- relayer/src/lib.rs | 1 + relayer/src/supervisor.rs | 256 ++++++++++++++++++++++++ 4 files changed, 261 insertions(+), 254 deletions(-) create mode 100644 relayer/src/supervisor.rs diff --git a/relayer-cli/src/commands/cli_utils.rs b/relayer-cli/src/commands/cli_utils.rs index 751024a367..976298c0d4 100644 --- a/relayer-cli/src/commands/cli_utils.rs +++ b/relayer-cli/src/commands/cli_utils.rs @@ -36,14 +36,6 @@ impl ChainHandlePair { ) -> Result { spawn_chain_runtimes(options, config, src_chain_id, dst_chain_id) } - - /// Swap the two handles to that `dst` becomes `src` and `src` becomes `dst` - pub fn swap(self) -> Self { - Self { - src: self.dst, - dst: self.src, - } - } } /// Spawn the source and destination chain runtime from the configuration and chain identifiers, diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs index cbd6e476c9..59e35600f2 100644 --- a/relayer-cli/src/commands/start_multi.rs +++ b/relayer-cli/src/commands/start_multi.rs @@ -1,31 +1,7 @@ -#![allow(unused_imports, unreachable_code, dead_code, unused_variables)] - -use std::{collections::HashMap, sync::Arc, thread::JoinHandle, time::Duration}; - use abscissa_core::{Command, Options, Runnable}; -use crossbeam_channel::{Receiver, Sender}; -use itertools::Itertools; -use prost_types::Any; -use ibc::{ - events::IbcEvent, - ics04_channel::{ - channel::State as ChannelState, - events::{CloseInit, SendPacket, TimeoutPacket, WriteAcknowledgement}, - msgs::recv_packet::MsgRecvPacket, - packet::{Packet, PacketMsgType}, - }, - ics24_host::identifier::{ChainId, ChannelId, PortId}, - tx_msg::Msg, - Height, -}; -use ibc_relayer::{ - chain::handle::ChainHandle, - channel::Channel, - config::Config, - event::monitor::EventBatch, - link::{Link, LinkError, LinkParameters, RelayPath}, -}; +use ibc::ics24_host::identifier::ChainId; +use ibc_relayer::{config::Config, supervisor::Supervisor}; use crate::commands::cli_utils::ChainHandlePair; use crate::conclude::Output; @@ -51,227 +27,9 @@ impl Runnable for StartMultiCmd { } } -struct WorkerCmd { - batch: Arc, -} - -impl WorkerCmd { - fn new(batch: Arc) -> Self { - Self { batch } - } -} - -struct WorkerHandle { - pub tx: Sender, - pub thread_handle: JoinHandle<()>, -} - -impl WorkerHandle { - fn handle_packet_events(&self, batch: EventBatch) -> Result<(), BoxError> { - self.tx.send(WorkerCmd::new(Arc::new(batch)))?; - Ok(()) - } -} - -struct Supervisor { - config: Config, - chains: ChainHandlePair, - workers: HashMap, -} - -impl Supervisor { - fn spawn(config: Config, chain_a: &ChainId, chain_b: &ChainId) -> Result { - let chains = ChainHandlePair::spawn(&config, chain_a, chain_b)?; - - Ok(Self { - config, - chains, - workers: HashMap::new(), - }) - } - - fn run(&mut self) -> Result<(), BoxError> { - let subscription_a = self.chains.src.subscribe()?; - let subscription_b = self.chains.dst.subscribe()?; - - loop { - println!("{} => iterating over event batches", self.chains.src.id()); - for batch in subscription_a.try_iter() { - self.process_batch(batch)?; - } - - println!("{} => iterating over event batches", self.chains.dst.id()); - for batch in subscription_b.try_iter() { - self.process_batch(batch)?; - } - - std::thread::sleep(Duration::from_millis(600)); - } - } - - fn process_batch(&mut self, batch: Arc) -> Result<(), BoxError> { - // TODO(romac): Need to send NewBlock events to all workers - - let events = collect_events(&batch.events, batch.chain_id.clone()); - let events_per_object = events.into_iter().into_group_map(); - - for (object, events) in events_per_object.into_iter() { - if events.is_empty() { - println!("no events in batch"); - return Ok(()); - } - - let worker_batch = EventBatch { - height: batch.height, - chain_id: batch.chain_id.clone(), - events, - }; - - let is_dest = batch.chain_id == self.chains.dst.id(); - let worker = self.worker_for_object(object, is_dest); - worker.handle_packet_events(worker_batch)?; - } - - Ok(()) - } - - fn worker_for_object(&mut self, object: Object, swap: bool) -> &WorkerHandle { - if self.workers.contains_key(&object) { - &self.workers[&object] - } else { - let chains = if swap { - self.chains.clone().swap() - } else { - self.chains.clone() - }; - - let worker = Worker::spawn(chains, object.clone()); - self.workers.entry(object).or_insert(worker) - } - } -} - -struct Worker { - chains: ChainHandlePair, - rx: Receiver, -} - -impl Worker { - fn spawn(chains: ChainHandlePair, object: Object) -> WorkerHandle { - let (tx, rx) = crossbeam_channel::unbounded(); - - let worker = Self { chains, rx }; - let thread_handle = std::thread::spawn(move || worker.run(object)); - - WorkerHandle { tx, thread_handle } - } - - fn run(self, object: Object) { - let result = match object { - Object::UnidirectionalChannelPath(path) => self.run_uni_chan_path(path), - }; - - if let Err(e) = result { - eprintln!("worker error: {}", e); - } - } - - fn run_uni_chan_path(self, path: UnidirectionalChannelPath) -> Result<(), BoxError> { - println!("running worker for object {:?}", path); - let mut link = Link::new_from_opts( - self.chains.src.clone(), - self.chains.dst.clone(), - LinkParameters { - src_port_id: path.src_port_id, - src_channel_id: path.src_channel_id, - }, - )?; - - while let Ok(cmd) = self.rx.recv() { - link.a_to_b.relay_from_events(cmd.batch)?; - } - - Ok(()) - } -} - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -struct UnidirectionalChannelPath { - pub src_chain_id: ChainId, - pub src_channel_id: ChannelId, - pub src_port_id: PortId, -} - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -enum Object { - UnidirectionalChannelPath(UnidirectionalChannelPath), -} - -impl From for Object { - fn from(p: UnidirectionalChannelPath) -> Self { - Self::UnidirectionalChannelPath(p) - } -} - -impl Object { - fn for_send_packet(e: &SendPacket, chain_id: &ChainId) -> Self { - UnidirectionalChannelPath { - src_chain_id: chain_id.clone(), - src_channel_id: e.packet.source_channel.clone(), - src_port_id: e.packet.source_port.clone(), - } - .into() - } - - fn for_write_ack(e: &WriteAcknowledgement, chain_id: &ChainId) -> Self { - UnidirectionalChannelPath { - src_chain_id: chain_id.clone(), - src_channel_id: e.packet.destination_channel.clone(), - src_port_id: e.packet.destination_port.clone(), - } - .into() - } - - fn for_timeout_packet(e: &TimeoutPacket, chain_id: &ChainId) -> Self { - UnidirectionalChannelPath { - src_chain_id: chain_id.clone(), - src_channel_id: e.src_channel_id().clone(), - src_port_id: e.src_port_id().clone(), - } - .into() - } - - fn for_close_init_channel(e: &CloseInit, chain_id: &ChainId) -> Self { - UnidirectionalChannelPath { - src_chain_id: chain_id.clone(), - src_channel_id: e.channel_id().clone(), - src_port_id: e.port_id().clone(), - } - .into() - } -} - -fn collect_events(events: &[IbcEvent], chain_id: ChainId) -> Vec<(Object, IbcEvent)> { - events - .iter() - .filter_map(|e| match e { - IbcEvent::SendPacket(p) => Some((Object::for_send_packet(p, &chain_id), e.clone())), - IbcEvent::TimeoutPacket(p) => { - Some((Object::for_timeout_packet(p, &chain_id), e.clone())) - } - IbcEvent::WriteAcknowledgement(p) => { - Some((Object::for_write_ack(p, &chain_id), e.clone())) - } - IbcEvent::CloseInitChannel(p) => { - Some((Object::for_close_init_channel(p, &chain_id), e.clone())) - } - _ => None, - }) - .collect() -} - fn start_multi(config: Config, chain_a: &ChainId, chain_b: &ChainId) -> Result { - let mut supervisor = Supervisor::spawn(config, chain_a, chain_b)?; + let chains = ChainHandlePair::spawn(&config, chain_a, chain_b)?; + let supervisor = Supervisor::spawn(chains.src, chains.dst)?; supervisor.run()?; Ok(Output::success("ok")) diff --git a/relayer/src/lib.rs b/relayer/src/lib.rs index 7342cedf82..93f0fc8eba 100644 --- a/relayer/src/lib.rs +++ b/relayer/src/lib.rs @@ -26,5 +26,6 @@ pub mod light_client; pub mod link; pub mod macros; pub mod relay; +pub mod supervisor; pub mod transfer; pub mod util; diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs new file mode 100644 index 0000000000..84d14dd9b1 --- /dev/null +++ b/relayer/src/supervisor.rs @@ -0,0 +1,256 @@ +use std::{collections::HashMap, sync::Arc, thread::JoinHandle, time::Duration}; + +use anomaly::BoxError; +use crossbeam_channel::{Receiver, Sender}; +use itertools::Itertools; + +use ibc::{ + events::IbcEvent, + ics04_channel::events::{CloseInit, SendPacket, TimeoutPacket, WriteAcknowledgement}, + ics24_host::identifier::{ChainId, ChannelId, PortId}, +}; + +use crate::{ + chain::handle::ChainHandle, + event::monitor::EventBatch, + link::{Link, LinkParameters}, +}; + +pub struct WorkerCmd { + pub batch: Arc, +} + +impl WorkerCmd { + pub fn new(batch: Arc) -> Self { + Self { batch } + } +} + +pub struct WorkerHandle { + pub tx: Sender, + pub thread_handle: JoinHandle<()>, +} + +impl WorkerHandle { + pub fn handle_packet_events(&self, batch: EventBatch) -> Result<(), BoxError> { + self.tx.send(WorkerCmd::new(Arc::new(batch)))?; + Ok(()) + } +} + +#[derive(Clone)] +pub struct ChainHandlePair { + pub a: Box, + pub b: Box, +} + +impl ChainHandlePair { + pub fn swap(self) -> Self { + Self { + a: self.b, + b: self.a, + } + } +} + +pub struct Supervisor { + chains: ChainHandlePair, + workers: HashMap, +} + +impl Supervisor { + pub fn spawn( + chain_a: Box, + chain_b: Box, + ) -> Result { + let chains = ChainHandlePair { + a: chain_a, + b: chain_b, + }; + + Ok(Self { + chains, + workers: HashMap::new(), + }) + } + + pub fn run(mut self) -> Result<(), BoxError> { + let subscription_a = self.chains.a.subscribe()?; + let subscription_b = self.chains.b.subscribe()?; + + loop { + println!("{} => iterating over event batches", self.chains.a.id()); + for batch in subscription_a.try_iter() { + self.process_batch(batch)?; + } + + println!("{} => iterating over event batches", self.chains.b.id()); + for batch in subscription_b.try_iter() { + self.process_batch(batch)?; + } + + std::thread::sleep(Duration::from_millis(600)); + } + } + + fn process_batch(&mut self, batch: Arc) -> Result<(), BoxError> { + // TODO(romac): Need to send NewBlock events to all workers + + let events = collect_events(&batch.events, batch.chain_id.clone()); + let events_per_object = events.into_iter().into_group_map(); + + for (object, events) in events_per_object.into_iter() { + if events.is_empty() { + println!("no events in batch"); + return Ok(()); + } + + let worker_batch = EventBatch { + height: batch.height, + chain_id: batch.chain_id.clone(), + events, + }; + + let is_dest = batch.chain_id == self.chains.b.id(); + let worker = self.worker_for_object(object, is_dest); + worker.handle_packet_events(worker_batch)?; + } + + Ok(()) + } + + fn worker_for_object(&mut self, object: Object, swap: bool) -> &WorkerHandle { + if self.workers.contains_key(&object) { + &self.workers[&object] + } else { + let chains = if swap { + self.chains.clone().swap() + } else { + self.chains.clone() + }; + + let worker = Worker::spawn(chains, object.clone()); + self.workers.entry(object).or_insert(worker) + } + } +} + +pub struct Worker { + chains: ChainHandlePair, + rx: Receiver, +} + +impl Worker { + pub fn spawn(chains: ChainHandlePair, object: Object) -> WorkerHandle { + let (tx, rx) = crossbeam_channel::unbounded(); + + let worker = Self { chains, rx }; + let thread_handle = std::thread::spawn(move || worker.run(object)); + + WorkerHandle { tx, thread_handle } + } + + pub fn run(self, object: Object) { + let result = match object { + Object::UnidirectionalChannelPath(path) => self.run_uni_chan_path(path), + }; + + if let Err(e) = result { + eprintln!("worker error: {}", e); + } + } + + fn run_uni_chan_path(self, path: UnidirectionalChannelPath) -> Result<(), BoxError> { + println!("running worker for object {:?}", path); + + let mut link = Link::new_from_opts( + self.chains.a.clone(), + self.chains.b.clone(), + LinkParameters { + src_port_id: path.src_port_id, + src_channel_id: path.src_channel_id, + }, + )?; + + while let Ok(cmd) = self.rx.recv() { + link.a_to_b.relay_from_events(cmd.batch)?; + } + + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct UnidirectionalChannelPath { + pub src_chain_id: ChainId, + pub src_channel_id: ChannelId, + pub src_port_id: PortId, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum Object { + UnidirectionalChannelPath(UnidirectionalChannelPath), +} + +impl From for Object { + fn from(p: UnidirectionalChannelPath) -> Self { + Self::UnidirectionalChannelPath(p) + } +} + +impl Object { + pub fn for_send_packet(e: &SendPacket, chain_id: &ChainId) -> Self { + UnidirectionalChannelPath { + src_chain_id: chain_id.clone(), + src_channel_id: e.packet.source_channel.clone(), + src_port_id: e.packet.source_port.clone(), + } + .into() + } + + pub fn for_write_ack(e: &WriteAcknowledgement, chain_id: &ChainId) -> Self { + UnidirectionalChannelPath { + src_chain_id: chain_id.clone(), + src_channel_id: e.packet.destination_channel.clone(), + src_port_id: e.packet.destination_port.clone(), + } + .into() + } + + pub fn for_timeout_packet(e: &TimeoutPacket, chain_id: &ChainId) -> Self { + UnidirectionalChannelPath { + src_chain_id: chain_id.clone(), + src_channel_id: e.src_channel_id().clone(), + src_port_id: e.src_port_id().clone(), + } + .into() + } + + pub fn for_close_init_channel(e: &CloseInit, chain_id: &ChainId) -> Self { + UnidirectionalChannelPath { + src_chain_id: chain_id.clone(), + src_channel_id: e.channel_id().clone(), + src_port_id: e.port_id().clone(), + } + .into() + } +} + +fn collect_events(events: &[IbcEvent], chain_id: ChainId) -> Vec<(Object, IbcEvent)> { + events + .iter() + .filter_map(|e| match e { + IbcEvent::SendPacket(p) => Some((Object::for_send_packet(p, &chain_id), e.clone())), + IbcEvent::TimeoutPacket(p) => { + Some((Object::for_timeout_packet(p, &chain_id), e.clone())) + } + IbcEvent::WriteAcknowledgement(p) => { + Some((Object::for_write_ack(p, &chain_id), e.clone())) + } + IbcEvent::CloseInitChannel(p) => { + Some((Object::for_close_init_channel(p, &chain_id), e.clone())) + } + _ => None, + }) + .collect() +} From 4172765b3ad2efbe6c87ea03abc455304d94353b Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 2 Mar 2021 11:31:07 +0100 Subject: [PATCH 12/14] Formatting --- relayer-cli/src/commands.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer-cli/src/commands.rs b/relayer-cli/src/commands.rs index 5ca7c72149..32766f36c1 100644 --- a/relayer-cli/src/commands.rs +++ b/relayer-cli/src/commands.rs @@ -14,7 +14,7 @@ use crate::config::Config; use self::{ create::CreateCmds, keys::KeysCmd, light::LightCmd, listen::ListenCmd, query::QueryCmd, - start_multi::StartMultiCmd, start::StartCmd, tx::TxCmd, version::VersionCmd, + start::StartCmd, start_multi::StartMultiCmd, tx::TxCmd, version::VersionCmd, }; mod channel; @@ -104,4 +104,4 @@ impl Configurable for CliCmd { fn process_config(&self, config: Config) -> Result { Ok(config) } -} \ No newline at end of file +} From c5f4da7f0af7773aab1da162cb81849729e743da Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 3 Mar 2021 12:36:49 +0100 Subject: [PATCH 13/14] Remove println statements --- relayer/src/supervisor.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 84d14dd9b1..3d4e864bb3 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -79,12 +79,10 @@ impl Supervisor { let subscription_b = self.chains.b.subscribe()?; loop { - println!("{} => iterating over event batches", self.chains.a.id()); for batch in subscription_a.try_iter() { self.process_batch(batch)?; } - println!("{} => iterating over event batches", self.chains.b.id()); for batch in subscription_b.try_iter() { self.process_batch(batch)?; } @@ -101,7 +99,6 @@ impl Supervisor { for (object, events) in events_per_object.into_iter() { if events.is_empty() { - println!("no events in batch"); return Ok(()); } @@ -253,4 +250,4 @@ fn collect_events(events: &[IbcEvent], chain_id: ChainId) -> Vec<(Object, IbcEve _ => None, }) .collect() -} +} \ No newline at end of file From 232b0ffebbb23a746a298efccfe59301a04461ff Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 3 Mar 2021 13:17:09 +0100 Subject: [PATCH 14/14] Formatting --- relayer/src/supervisor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 3d4e864bb3..6b5a98fcec 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -250,4 +250,4 @@ fn collect_events(events: &[IbcEvent], chain_id: ChainId) -> Vec<(Object, IbcEve _ => None, }) .collect() -} \ No newline at end of file +}