From 05afbfe8f61385631a33bc66778f59a58bf22dc1 Mon Sep 17 00:00:00 2001 From: Adi Seredinschi Date: Wed, 7 Jul 2021 15:52:59 +0200 Subject: [PATCH] Fix for schedule refreshing bug (#1144) * Adjusted logs for better msg correlation. * Fixes #1143. * Selective backoff in path worker. * Fmt & clippy * Adjusted the backoff mechanism to depend on current chan length * Apply @romac's suggestion * Changelog entry --- CHANGELOG.md | 7 ++++++ relayer/src/link.rs | 37 +++++++++++++++++++---------- relayer/src/worker/handle.rs | 2 +- relayer/src/worker/uni_chan_path.rs | 13 +++++++--- 4 files changed, 42 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a333de62b0..27fbe58df3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,12 @@ reliability of Hermes ([#697]). - Update to `tendermint-rs` v0.20.0 ([#1125]) - Add inline documentation to config.toml ([#1127]) +### BUG FIXES + +- [ibc-relayer] + - Fix for schedule refreshing bug ([#1143]) + + [#600]: https://github.com/informalsystems/ibc-rs/issues/600 [#697]: https://github.com/informalsystems/ibc-rs/issues/697 [#1062]: https://github.com/informalsystems/ibc-rs/issues/1062 @@ -25,6 +31,7 @@ reliability of Hermes ([#697]). [#1125]: https://github.com/informalsystems/ibc-rs/issues/1125 [#1127]: https://github.com/informalsystems/ibc-rs/issues/1127 [#1140]: https://github.com/informalsystems/ibc-rs/issues/1140 +[#1143]: https://github.com/informalsystems/ibc-rs/issues/1143 ## v0.5.0 diff --git a/relayer/src/link.rs b/relayer/src/link.rs index 54584d0f1f..79e2a9a7b3 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -598,10 +598,11 @@ impl RelayPath { for i in 0..MAX_RETRIES { info!( - "[{}] relay op. data to {}, proofs height {}, (delayed by: {:?}) [try {}/{}]", + "[{}] relay op. data of {} msgs(s) to {} (height {}), delayed by: {:?} [try {}/{}]", self, + odata.batch.len(), odata.target, - odata.proofs_height, + odata.proofs_height.increment(), odata.scheduled_time.elapsed(), i + 1, MAX_RETRIES @@ -1326,11 +1327,17 @@ impl RelayPath { IbcEvent::SendPacket(e) => { // Catch any SendPacket event that timed-out if self.send_packet_event_handled(e)? { - debug!("[{}] {} already handled", self, e); + debug!( + "[{}] refreshing schedule: already handled send packet {}", + self, e + ); } else if let Some(new_msg) = self.build_timeout_from_send_packet_event(e, dst_current_height)? { - debug!("[{}] found a timed-out msg in the op data {}", self, odata); + debug!( + "[{}] refreshing schedule: found a timed-out msg in the op data {}", + self, odata + ); timed_out.entry(odata_pos).or_insert_with(Vec::new).push( TransitMessage { event: event.clone(), @@ -1344,7 +1351,10 @@ impl RelayPath { } IbcEvent::WriteAcknowledgement(e) => { if self.write_ack_event_handled(e)? { - debug!("[{}] {} already handled", self, e); + debug!( + "[{}] refreshing schedule: already handled {} write ack ", + self, e + ); } else { retain_batch.push(gm.clone()); } @@ -1357,6 +1367,13 @@ impl RelayPath { odata.batch = retain_batch; } + // Replace the original operational data with the updated one + self.dst_operational_data = all_dst_odata; + // Possibly some op. data became empty (if no events were kept). + // Retain only the non-empty ones. + self.dst_operational_data.retain(|o| !o.batch.is_empty()); + + // Handle timed-out events if timed_out.is_empty() { // Nothing timed out in the meantime return Ok(()); @@ -1370,7 +1387,7 @@ impl RelayPath { new_od.batch = batch; info!( - "[{}] re-scheduling from new timed-out batch of size {}", + "[{}] refreshing schedule: re-scheduling from new timed-out batch of size {}", self, new_od.batch.len() ); @@ -1378,12 +1395,6 @@ impl RelayPath { self.schedule_operational_data(new_od)?; } - self.dst_operational_data = all_dst_odata; - - // Possibly some op. data became empty (if no events were kept). - // Retain only the non-empty ones. - self.dst_operational_data.retain(|o| !o.batch.is_empty()); - Ok(()) } @@ -1400,7 +1411,7 @@ impl RelayPath { } info!( - "[{}] scheduling op. data with {} msg(s) for {} chain (height {})", + "[{}] scheduling op. data with {} msg(s) for {} (height {})", self, od.batch.len(), od.target, diff --git a/relayer/src/worker/handle.rs b/relayer/src/worker/handle.rs index 4d95a33c53..9576b496fb 100644 --- a/relayer/src/worker/handle.rs +++ b/relayer/src/worker/handle.rs @@ -50,7 +50,7 @@ impl WorkerHandle { Ok(()) } - /// Send a batch of [`NewBlock`] event to the worker. + /// Send a [`NewBlock`] event to the worker. pub fn send_new_block(&self, height: Height, new_block: NewBlock) -> Result<(), BoxError> { self.tx.send(WorkerCmd::NewBlock { height, new_block })?; Ok(()) diff --git a/relayer/src/worker/uni_chan_path.rs b/relayer/src/worker/uni_chan_path.rs index a12874a05e..34ca7fef16 100644 --- a/relayer/src/worker/uni_chan_path.rs +++ b/relayer/src/worker/uni_chan_path.rs @@ -1,4 +1,4 @@ -use std::{thread, time::Duration}; +use std::time::Duration; use anomaly::BoxError; use crossbeam_channel::Receiver; @@ -57,10 +57,17 @@ impl UniChanPathWorker { } loop { - thread::sleep(Duration::from_millis(200)); + const BACKOFF: Duration = Duration::from_millis(200); + + // Pop-out any unprocessed commands + // If there are no incoming commands, it's safe to backoff. + let maybe_cmd = crossbeam_channel::select! { + recv(self.cmd_rx) -> cmd => cmd.ok(), + recv(crossbeam_channel::after(BACKOFF)) -> _ => None, + }; let result = retry_with_index(retry_strategy::worker_default_strategy(), |index| { - Self::step(self.cmd_rx.try_recv().ok(), &mut link, index) + Self::step(maybe_cmd.clone(), &mut link, index) }); match result {