From 1d3b7c09a5bb3237508c956281d2e4fa6511e5ce Mon Sep 17 00:00:00 2001 From: fewensa <37804932+fewensa@users.noreply.github.com> Date: Tue, 9 Nov 2021 17:01:06 +0800 Subject: [PATCH] Custom relay strategy (#1198) * Add relayer strategy * Add default relayer strategy * default relayer strategy * expose relayer strategy * fix compile * fix compile * docs * Rename Relayer to Relay, keep RelayerDecide * split `DefaultRelayerStrategy` into `AltruisticRelayerStrategy` and `RationalRelayerStrategy` * Remove relayer mode * Remove unused import * Rename `RelayerStrategy` to `RelayStrategy` * Add missing docs * clippy * clippy * clippy * clippy * Revert `relayer_mode` and add `MixStrategy` * Add `EnforcementStrategy` * fix bug and simplify relay strategy * Update message_lane_loop.rs * Update messages_target.rs * clippy * clippy * clippy * clippy * clippy * clippy * clippy * fix test * fix test * test test test fix test --- .../src/chains/kusama_messages_to_polkadot.rs | 20 +- .../src/chains/millau_messages_to_rialto.rs | 14 +- .../src/chains/polkadot_messages_to_kusama.rs | 14 +- .../src/chains/rialto_messages_to_millau.rs | 14 +- .../src/chains/rococo_messages_to_wococo.rs | 14 +- .../src/chains/wococo_messages_to_rococo.rs | 14 +- .../src/cli/relay_headers_and_messages.rs | 6 +- .../bin-substrate/src/cli/relay_messages.rs | 5 +- .../lib-substrate-relay/src/messages_lane.rs | 11 +- bridges/relays/messages/src/lib.rs | 1 + .../relays/messages/src/message_lane_loop.rs | 53 ++- .../messages/src/message_race_delivery.rs | 343 +++--------------- .../src/relay_strategy/altruistic_strategy.rs | 45 +++ .../relay_strategy/enforcement_strategy.rs | 219 +++++++++++ .../src/relay_strategy/mix_strategy.rs | 40 ++ .../relays/messages/src/relay_strategy/mod.rs | 123 +++++++ .../src/relay_strategy/rational_strategy.rs | 122 +++++++ 17 files changed, 714 insertions(+), 344 deletions(-) create mode 100644 bridges/relays/messages/src/relay_strategy/altruistic_strategy.rs create mode 100644 bridges/relays/messages/src/relay_strategy/enforcement_strategy.rs create mode 100644 bridges/relays/messages/src/relay_strategy/mix_strategy.rs create mode 100644 bridges/relays/messages/src/relay_strategy/mod.rs create mode 100644 bridges/relays/messages/src/relay_strategy/rational_strategy.rs diff --git a/bridges/relays/bin-substrate/src/chains/kusama_messages_to_polkadot.rs b/bridges/relays/bin-substrate/src/chains/kusama_messages_to_polkadot.rs index 86e2e22cd9094..9c9dee150dc46 100644 --- a/bridges/relays/bin-substrate/src/chains/kusama_messages_to_polkadot.rs +++ b/bridges/relays/bin-substrate/src/chains/kusama_messages_to_polkadot.rs @@ -19,12 +19,13 @@ use std::ops::RangeInclusive; use codec::Encode; +use frame_support::weights::Weight; use sp_core::{Bytes, Pair}; +use sp_runtime::{FixedPointNumber, FixedU128}; use bp_messages::MessageNonce; use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof; -use frame_support::weights::Weight; -use messages_relay::message_lane::MessageLane; +use messages_relay::{message_lane::MessageLane, relay_strategy::MixStrategy}; use relay_kusama_client::{ HeaderId as KusamaHeaderId, Kusama, SigningParams as KusamaSigningParams, }; @@ -33,7 +34,6 @@ use relay_polkadot_client::{ }; use relay_substrate_client::{Chain, Client, TransactionSignScheme, UnsignedTransaction}; use relay_utils::metrics::MetricsParams; -use sp_runtime::{FixedPointNumber, FixedU128}; use substrate_relay_helper::{ messages_lane::{ select_delivery_transaction_limits, MessagesRelayParams, StandaloneMessagesMetrics, @@ -177,7 +177,13 @@ type PolkadotTargetClient = SubstrateMessagesTarget; /// Run Kusama-to-Polkadot messages sync. pub async fn run( - params: MessagesRelayParams, + params: MessagesRelayParams< + Kusama, + KusamaSigningParams, + Polkadot, + PolkadotSigningParams, + MixStrategy, + >, ) -> anyhow::Result<()> { let stall_timeout = relay_substrate_client::bidirectional_transaction_stall_timeout( params.source_transactions_mortality, @@ -223,14 +229,12 @@ pub async fn run( Max messages in single transaction: {}\n\t\ Max messages size in single transaction: {}\n\t\ Max messages weight in single transaction: {}\n\t\ - Relayer mode: {:?}\n\t\ Tx mortality: {:?}/{:?}\n\t\ Stall timeout: {:?}", lane.message_lane.relayer_id_at_source, max_messages_in_single_batch, max_messages_size_in_single_batch, max_messages_weight_in_single_batch, - params.relayer_mode, params.source_transactions_mortality, params.target_transactions_mortality, stall_timeout, @@ -258,7 +262,7 @@ pub async fn run( max_messages_in_single_batch, max_messages_weight_in_single_batch, max_messages_size_in_single_batch, - relayer_mode: params.relayer_mode, + relay_strategy: params.relay_strategy, }, }, KusamaSourceClient::new( @@ -332,7 +336,7 @@ pub(crate) async fn update_polkadot_to_kusama_conversion_rate( transaction_nonce, ), ) - .encode(), + .encode(), ) }) .await diff --git a/bridges/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs b/bridges/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs index 570a3449752b3..3661eb78c19f9 100644 --- a/bridges/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs +++ b/bridges/relays/bin-substrate/src/chains/millau_messages_to_rialto.rs @@ -25,7 +25,7 @@ use sp_core::{Bytes, Pair}; use bp_messages::MessageNonce; use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof; use frame_support::weights::Weight; -use messages_relay::message_lane::MessageLane; +use messages_relay::{message_lane::MessageLane, relay_strategy::MixStrategy}; use relay_millau_client::{ HeaderId as MillauHeaderId, Millau, SigningParams as MillauSigningParams, }; @@ -174,7 +174,13 @@ type RialtoTargetClient = SubstrateMessagesTarget; /// Run Millau-to-Rialto messages sync. pub async fn run( - params: MessagesRelayParams, + params: MessagesRelayParams< + Millau, + MillauSigningParams, + Rialto, + RialtoSigningParams, + MixStrategy, + >, ) -> anyhow::Result<()> { let stall_timeout = relay_substrate_client::bidirectional_transaction_stall_timeout( params.source_transactions_mortality, @@ -217,14 +223,12 @@ pub async fn run( Max messages in single transaction: {}\n\t\ Max messages size in single transaction: {}\n\t\ Max messages weight in single transaction: {}\n\t\ - Relayer mode: {:?}\n\t\ Tx mortality: {:?}/{:?}\n\t\ Stall timeout: {:?}", lane.message_lane.relayer_id_at_source, max_messages_in_single_batch, max_messages_size_in_single_batch, max_messages_weight_in_single_batch, - params.relayer_mode, params.source_transactions_mortality, params.target_transactions_mortality, stall_timeout, @@ -252,7 +256,7 @@ pub async fn run( max_messages_in_single_batch, max_messages_weight_in_single_batch, max_messages_size_in_single_batch, - relayer_mode: params.relayer_mode, + relay_strategy: params.relay_strategy, }, }, MillauSourceClient::new( diff --git a/bridges/relays/bin-substrate/src/chains/polkadot_messages_to_kusama.rs b/bridges/relays/bin-substrate/src/chains/polkadot_messages_to_kusama.rs index 8af62bc80b1f2..b1595665fb255 100644 --- a/bridges/relays/bin-substrate/src/chains/polkadot_messages_to_kusama.rs +++ b/bridges/relays/bin-substrate/src/chains/polkadot_messages_to_kusama.rs @@ -24,7 +24,7 @@ use sp_core::{Bytes, Pair}; use bp_messages::MessageNonce; use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof; use frame_support::weights::Weight; -use messages_relay::message_lane::MessageLane; +use messages_relay::{message_lane::MessageLane, relay_strategy::MixStrategy}; use relay_kusama_client::{ HeaderId as KusamaHeaderId, Kusama, SigningParams as KusamaSigningParams, }; @@ -176,7 +176,13 @@ type KusamaTargetClient = SubstrateMessagesTarget; /// Run Polkadot-to-Kusama messages sync. pub async fn run( - params: MessagesRelayParams, + params: MessagesRelayParams< + Polkadot, + PolkadotSigningParams, + Kusama, + KusamaSigningParams, + MixStrategy, + >, ) -> anyhow::Result<()> { let stall_timeout = relay_substrate_client::bidirectional_transaction_stall_timeout( params.source_transactions_mortality, @@ -222,14 +228,12 @@ pub async fn run( Max messages in single transaction: {}\n\t\ Max messages size in single transaction: {}\n\t\ Max messages weight in single transaction: {}\n\t\ - Relayer mode: {:?}\n\t\ Tx mortality: {:?}/{:?}\n\t\ Stall timeout: {:?}", lane.message_lane.relayer_id_at_source, max_messages_in_single_batch, max_messages_size_in_single_batch, max_messages_weight_in_single_batch, - params.relayer_mode, params.source_transactions_mortality, params.target_transactions_mortality, stall_timeout, @@ -257,7 +261,7 @@ pub async fn run( max_messages_in_single_batch, max_messages_weight_in_single_batch, max_messages_size_in_single_batch, - relayer_mode: params.relayer_mode, + relay_strategy: params.relay_strategy, }, }, PolkadotSourceClient::new( diff --git a/bridges/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs b/bridges/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs index f85a5e760e631..50ebf264e1a49 100644 --- a/bridges/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs +++ b/bridges/relays/bin-substrate/src/chains/rialto_messages_to_millau.rs @@ -25,7 +25,7 @@ use sp_core::{Bytes, Pair}; use bp_messages::MessageNonce; use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof; use frame_support::weights::Weight; -use messages_relay::message_lane::MessageLane; +use messages_relay::{message_lane::MessageLane, relay_strategy::MixStrategy}; use relay_millau_client::{ HeaderId as MillauHeaderId, Millau, SigningParams as MillauSigningParams, }; @@ -174,7 +174,13 @@ type MillauTargetClient = SubstrateMessagesTarget; /// Run Rialto-to-Millau messages sync. pub async fn run( - params: MessagesRelayParams, + params: MessagesRelayParams< + Rialto, + RialtoSigningParams, + Millau, + MillauSigningParams, + MixStrategy, + >, ) -> anyhow::Result<()> { let stall_timeout = relay_substrate_client::bidirectional_transaction_stall_timeout( params.source_transactions_mortality, @@ -216,14 +222,12 @@ pub async fn run( Max messages in single transaction: {}\n\t\ Max messages size in single transaction: {}\n\t\ Max messages weight in single transaction: {}\n\t\ - Relayer mode: {:?}\n\t\ Tx mortality: {:?}/{:?}\n\t\ Stall timeout: {:?}", lane.message_lane.relayer_id_at_source, max_messages_in_single_batch, max_messages_size_in_single_batch, max_messages_weight_in_single_batch, - params.relayer_mode, params.source_transactions_mortality, params.target_transactions_mortality, stall_timeout, @@ -251,7 +255,7 @@ pub async fn run( max_messages_in_single_batch, max_messages_weight_in_single_batch, max_messages_size_in_single_batch, - relayer_mode: params.relayer_mode, + relay_strategy: params.relay_strategy, }, }, RialtoSourceClient::new( diff --git a/bridges/relays/bin-substrate/src/chains/rococo_messages_to_wococo.rs b/bridges/relays/bin-substrate/src/chains/rococo_messages_to_wococo.rs index 7f9a2eb98f0e0..523d8c4908597 100644 --- a/bridges/relays/bin-substrate/src/chains/rococo_messages_to_wococo.rs +++ b/bridges/relays/bin-substrate/src/chains/rococo_messages_to_wococo.rs @@ -24,7 +24,7 @@ use sp_core::{Bytes, Pair}; use bp_messages::MessageNonce; use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof; use frame_support::weights::Weight; -use messages_relay::message_lane::MessageLane; +use messages_relay::{message_lane::MessageLane, relay_strategy::MixStrategy}; use relay_rococo_client::{ HeaderId as RococoHeaderId, Rococo, SigningParams as RococoSigningParams, }; @@ -174,7 +174,13 @@ type WococoTargetClient = SubstrateMessagesTarget; /// Run Rococo-to-Wococo messages sync. pub async fn run( - params: MessagesRelayParams, + params: MessagesRelayParams< + Rococo, + RococoSigningParams, + Wococo, + WococoSigningParams, + MixStrategy, + >, ) -> anyhow::Result<()> { let stall_timeout = relay_substrate_client::bidirectional_transaction_stall_timeout( params.source_transactions_mortality, @@ -220,14 +226,12 @@ pub async fn run( Max messages in single transaction: {}\n\t\ Max messages size in single transaction: {}\n\t\ Max messages weight in single transaction: {}\n\t\ - Relayer mode: {:?}\n\t\ Tx mortality: {:?}/{:?}\n\t\ Stall timeout: {:?}", lane.message_lane.relayer_id_at_source, max_messages_in_single_batch, max_messages_size_in_single_batch, max_messages_weight_in_single_batch, - params.relayer_mode, params.source_transactions_mortality, params.target_transactions_mortality, stall_timeout, @@ -255,7 +259,7 @@ pub async fn run( max_messages_in_single_batch, max_messages_weight_in_single_batch, max_messages_size_in_single_batch, - relayer_mode: params.relayer_mode, + relay_strategy: params.relay_strategy, }, }, RococoSourceClient::new( diff --git a/bridges/relays/bin-substrate/src/chains/wococo_messages_to_rococo.rs b/bridges/relays/bin-substrate/src/chains/wococo_messages_to_rococo.rs index 420b946506095..893aeb607ab70 100644 --- a/bridges/relays/bin-substrate/src/chains/wococo_messages_to_rococo.rs +++ b/bridges/relays/bin-substrate/src/chains/wococo_messages_to_rococo.rs @@ -24,7 +24,7 @@ use sp_core::{Bytes, Pair}; use bp_messages::MessageNonce; use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof; use frame_support::weights::Weight; -use messages_relay::message_lane::MessageLane; +use messages_relay::{message_lane::MessageLane, relay_strategy::MixStrategy}; use relay_rococo_client::{ HeaderId as RococoHeaderId, Rococo, SigningParams as RococoSigningParams, }; @@ -173,7 +173,13 @@ type RococoTargetClient = SubstrateMessagesTarget; /// Run Wococo-to-Rococo messages sync. pub async fn run( - params: MessagesRelayParams, + params: MessagesRelayParams< + Wococo, + WococoSigningParams, + Rococo, + RococoSigningParams, + MixStrategy, + >, ) -> anyhow::Result<()> { let stall_timeout = relay_substrate_client::bidirectional_transaction_stall_timeout( params.source_transactions_mortality, @@ -219,14 +225,12 @@ pub async fn run( Max messages in single transaction: {}\n\t\ Max messages size in single transaction: {}\n\t\ Max messages weight in single transaction: {}\n\t\ - Relayer mode: {:?}\n\t\ Tx mortality: {:?}/{:?}\n\t\ Stall timeout: {:?}", lane.message_lane.relayer_id_at_source, max_messages_in_single_batch, max_messages_size_in_single_batch, max_messages_weight_in_single_batch, - params.relayer_mode, params.source_transactions_mortality, params.target_transactions_mortality, stall_timeout, @@ -254,7 +258,7 @@ pub async fn run( max_messages_in_single_batch, max_messages_weight_in_single_batch, max_messages_size_in_single_batch, - relayer_mode: params.relayer_mode, + relay_strategy: params.relay_strategy, }, }, WococoSourceClient::new( diff --git a/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs b/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs index 748b68e33c4d8..076331112a21f 100644 --- a/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs +++ b/bridges/relays/bin-substrate/src/cli/relay_headers_and_messages.rs @@ -27,6 +27,7 @@ use structopt::StructOpt; use strum::VariantNames; use codec::Encode; +use messages_relay::relay_strategy::MixStrategy; use relay_substrate_client::{ AccountIdOf, Chain, Client, TransactionSignScheme, UnsignedTransaction, }; @@ -375,6 +376,7 @@ impl RelayHeadersAndMessages { let lanes = params.shared.lane; let relayer_mode = params.shared.relayer_mode.into(); + let relay_strategy = MixStrategy::new(relayer_mode); const METRIC_IS_SOME_PROOF: &str = "it is `None` when metric has been already registered; \ @@ -519,12 +521,12 @@ impl RelayHeadersAndMessages { source_to_target_headers_relay: Some(left_to_right_on_demand_headers.clone()), target_to_source_headers_relay: Some(right_to_left_on_demand_headers.clone()), lane_id: lane, - relayer_mode, metrics_params: metrics_params.clone().disable().metrics_prefix( messages_relay::message_lane_loop::metrics_prefix::< ::MessageLane, >(&lane), ), + relay_strategy: relay_strategy.clone(), }) .map_err(|e| anyhow::format_err!("{}", e)) .boxed(); @@ -538,12 +540,12 @@ impl RelayHeadersAndMessages { source_to_target_headers_relay: Some(right_to_left_on_demand_headers.clone()), target_to_source_headers_relay: Some(left_to_right_on_demand_headers.clone()), lane_id: lane, - relayer_mode, metrics_params: metrics_params.clone().disable().metrics_prefix( messages_relay::message_lane_loop::metrics_prefix::< ::MessageLane, >(&lane), ), + relay_strategy: relay_strategy.clone(), }) .map_err(|e| anyhow::format_err!("{}", e)) .boxed(); diff --git a/bridges/relays/bin-substrate/src/cli/relay_messages.rs b/bridges/relays/bin-substrate/src/cli/relay_messages.rs index fd6875cc2a0f4..4b2e0c975602a 100644 --- a/bridges/relays/bin-substrate/src/cli/relay_messages.rs +++ b/bridges/relays/bin-substrate/src/cli/relay_messages.rs @@ -17,6 +17,7 @@ use structopt::StructOpt; use strum::{EnumString, EnumVariantNames, VariantNames}; +use messages_relay::relay_strategy::MixStrategy; use substrate_relay_helper::messages_lane::MessagesRelayParams; use crate::{ @@ -80,6 +81,8 @@ impl RelayMessages { let target_client = self.target.to_client::().await?; let target_sign = self.target_sign.to_keypair::()?; let target_transactions_mortality = self.target_sign.transactions_mortality()?; + let relayer_mode = self.relayer_mode.into(); + let relay_strategy = MixStrategy::new(relayer_mode); relay_messages(MessagesRelayParams { source_client, @@ -91,8 +94,8 @@ impl RelayMessages { source_to_target_headers_relay: None, target_to_source_headers_relay: None, lane_id: self.lane.into(), - relayer_mode: self.relayer_mode.into(), metrics_params: self.prometheus_params.into(), + relay_strategy, }) .await .map_err(|e| anyhow::format_err!("{}", e)) diff --git a/bridges/relays/lib-substrate-relay/src/messages_lane.rs b/bridges/relays/lib-substrate-relay/src/messages_lane.rs index 876b2c2f25728..5e9564cf95e3c 100644 --- a/bridges/relays/lib-substrate-relay/src/messages_lane.rs +++ b/bridges/relays/lib-substrate-relay/src/messages_lane.rs @@ -25,7 +25,10 @@ use async_trait::async_trait; use bp_messages::{LaneId, MessageNonce}; use bp_runtime::{AccountIdOf, IndexOf}; use frame_support::weights::Weight; -use messages_relay::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}; +use messages_relay::{ + message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, + relay_strategy::RelayStrategy, +}; use relay_substrate_client::{ metrics::{FloatStorageValueMetric, StorageProofOverheadMetric}, BlockNumberOf, Chain, Client, HashOf, @@ -39,7 +42,7 @@ use sp_runtime::FixedU128; use std::ops::RangeInclusive; /// Substrate <-> Substrate messages relay parameters. -pub struct MessagesRelayParams { +pub struct MessagesRelayParams { /// Messages source client. pub source_client: Client, /// Sign parameters for messages source chain. @@ -58,10 +61,10 @@ pub struct MessagesRelayParams { pub target_to_source_headers_relay: Option>, /// Identifier of lane that needs to be served. pub lane_id: LaneId, - /// Relayer operating mode. - pub relayer_mode: messages_relay::message_lane_loop::RelayerMode, /// Metrics parameters. pub metrics_params: MetricsParams, + /// Relay strategy + pub relay_strategy: Strategy, } /// Message sync pipeline for Substrate <-> Substrate relays. diff --git a/bridges/relays/messages/src/lib.rs b/bridges/relays/messages/src/lib.rs index 861091ab20506..c9e460300342f 100644 --- a/bridges/relays/messages/src/lib.rs +++ b/bridges/relays/messages/src/lib.rs @@ -29,6 +29,7 @@ mod metrics; pub mod message_lane; pub mod message_lane_loop; +pub mod relay_strategy; mod message_race_delivery; mod message_race_loop; diff --git a/bridges/relays/messages/src/message_lane_loop.rs b/bridges/relays/messages/src/message_lane_loop.rs index e727794aa81cd..2de644091ef2f 100644 --- a/bridges/relays/messages/src/message_lane_loop.rs +++ b/bridges/relays/messages/src/message_lane_loop.rs @@ -24,17 +24,13 @@ //! finalized header. I.e. when talking about headers in lane context, we //! only care about finalized headers. -use crate::{ - message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, - message_race_delivery::run as run_message_delivery_race, - message_race_receiving::run as run_message_receiving_race, - metrics::MessageLaneLoopMetrics, -}; +use std::{collections::BTreeMap, fmt::Debug, future::Future, ops::RangeInclusive, time::Duration}; use async_trait::async_trait; +use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt}; + use bp_messages::{LaneId, MessageNonce, UnrewardedRelayersState, Weight}; use bp_runtime::messages::DispatchFeePayment; -use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt}; use relay_utils::{ interval, metrics::{GlobalMetrics, MetricsParams}, @@ -42,11 +38,18 @@ use relay_utils::{ relay_loop::Client as RelayClient, retry_backoff, FailedClient, }; -use std::{collections::BTreeMap, fmt::Debug, future::Future, ops::RangeInclusive, time::Duration}; + +use crate::{ + message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, + message_race_delivery::run as run_message_delivery_race, + message_race_receiving::run as run_message_receiving_race, + metrics::MessageLaneLoopMetrics, + relay_strategy::RelayStrategy, +}; /// Message lane loop configuration params. #[derive(Debug, Clone)] -pub struct Params { +pub struct Params { /// Id of lane this loop is servicing. pub lane: LaneId, /// Interval at which we ask target node about its updates. @@ -58,7 +61,7 @@ pub struct Params { /// The loop will auto-restart if there has been no updates during this period. pub stall_timeout: Duration, /// Message delivery race parameters. - pub delivery_params: MessageDeliveryParams, + pub delivery_params: MessageDeliveryParams, } /// Relayer operating mode. @@ -73,7 +76,7 @@ pub enum RelayerMode { /// Message delivery race parameters. #[derive(Debug, Clone)] -pub struct MessageDeliveryParams { +pub struct MessageDeliveryParams { /// Maximal number of unconfirmed relayer entries at the inbound lane. If there's that number /// of entries in the `InboundLaneData::relayers` set, all new messages will be rejected until /// reward payment will be proved (by including outbound lane state to the message delivery @@ -89,8 +92,8 @@ pub struct MessageDeliveryParams { pub max_messages_weight_in_single_batch: Weight, /// Maximal cumulative size of relayed messages in single delivery transaction. pub max_messages_size_in_single_batch: u32, - /// Relayer operating mode. - pub relayer_mode: RelayerMode, + /// Relay strategy + pub relay_strategy: Strategy, } /// Message details. @@ -257,8 +260,8 @@ pub fn metrics_prefix(lane: &LaneId) -> String { } /// Run message lane service loop. -pub async fn run( - params: Params, +pub async fn run( + params: Params, source_client: impl SourceClient

, target_client: impl TargetClient

, metrics_params: MetricsParams, @@ -286,8 +289,13 @@ pub async fn run( /// Run one-way message delivery loop until connection with target or source node is lost, or exit /// signal is received. -async fn run_until_connection_lost, TC: TargetClient

>( - params: Params, +async fn run_until_connection_lost< + P: MessageLane, + Strategy: RelayStrategy, + SC: SourceClient

, + TC: TargetClient

, +>( + params: Params, source_client: SC, target_client: TC, metrics_msg: Option, @@ -449,11 +457,16 @@ async fn run_until_connection_lost, TC: Targ #[cfg(test)] pub(crate) mod tests { - use super::*; + use std::sync::Arc; + use futures::stream::StreamExt; use parking_lot::Mutex; + use relay_utils::{HeaderId, MaybeConnectionError}; - use std::sync::Arc; + + use crate::relay_strategy::AltruisticStrategy; + + use super::*; pub fn header_id(number: TestSourceHeaderNumber) -> TestSourceHeaderId { HeaderId(number, number) @@ -807,7 +820,7 @@ pub(crate) mod tests { max_messages_in_single_batch: 4, max_messages_weight_in_single_batch: 4, max_messages_size_in_single_batch: 4, - relayer_mode: RelayerMode::Altruistic, + relay_strategy: AltruisticStrategy, }, }, source_client, diff --git a/bridges/relays/messages/src/message_race_delivery.rs b/bridges/relays/messages/src/message_race_delivery.rs index 4fd721dae5977..1cd2cbd267185 100644 --- a/bridges/relays/messages/src/message_race_delivery.rs +++ b/bridges/relays/messages/src/message_race_delivery.rs @@ -13,10 +13,18 @@ //! Message delivery race delivers proof-of-messages from "lane.source" to "lane.target". +use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration}; + +use async_trait::async_trait; +use futures::stream::FusedStream; + +use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight}; +use relay_utils::FailedClient; + use crate::{ message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, message_lane_loop::{ - MessageDeliveryParams, MessageDetailsMap, MessageProofParameters, RelayerMode, + MessageDeliveryParams, MessageDetailsMap, MessageProofParameters, SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient, TargetClientState, }, @@ -24,32 +32,20 @@ use crate::{ MessageRace, NoncesRange, RaceState, RaceStrategy, SourceClient, SourceClientNonces, TargetClient, TargetClientNonces, }, - message_race_strategy::{BasicStrategy, SourceRangesQueue}, + message_race_strategy::BasicStrategy, metrics::MessageLaneLoopMetrics, -}; - -use async_trait::async_trait; -use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight}; -use bp_runtime::messages::DispatchFeePayment; -use futures::stream::FusedStream; -use num_traits::{SaturatingAdd, Zero}; -use relay_utils::FailedClient; -use std::{ - collections::VecDeque, - marker::PhantomData, - ops::{Range, RangeInclusive}, - time::Duration, + relay_strategy::{EnforcementStrategy, RelayMessagesBatchReference, RelayStrategy}, }; /// Run message delivery race. -pub async fn run( +pub async fn run( source_client: impl MessageLaneSourceClient

, source_state_updates: impl FusedStream>, target_client: impl MessageLaneTargetClient

, target_state_updates: impl FusedStream>, stall_timeout: Duration, metrics_msg: Option, - params: MessageDeliveryParams, + params: MessageDeliveryParams, ) -> Result<(), FailedClient> { crate::message_race_loop::run( MessageDeliveryRaceSource { @@ -65,7 +61,7 @@ pub async fn run( }, target_state_updates, stall_timeout, - MessageDeliveryStrategy:: { + MessageDeliveryStrategy:: { lane_source_client: source_client, lane_target_client: target_client, max_unrewarded_relayer_entries_at_target: params @@ -74,7 +70,7 @@ pub async fn run( max_messages_in_single_batch: params.max_messages_in_single_batch, max_messages_weight_in_single_batch: params.max_messages_weight_in_single_batch, max_messages_size_in_single_batch: params.max_messages_size_in_single_batch, - relayer_mode: params.relayer_mode, + relay_strategy: params.relay_strategy, latest_confirmed_nonces_at_source: VecDeque::new(), target_nonces: None, strategy: BasicStrategy::new(), @@ -235,7 +231,7 @@ struct DeliveryRaceTargetNoncesData { } /// Messages delivery strategy. -struct MessageDeliveryStrategy { +struct MessageDeliveryStrategy { /// The client that is connected to the message lane source node. lane_source_client: SC, /// The client that is connected to the message lane target node. @@ -251,7 +247,7 @@ struct MessageDeliveryStrategy { /// Maximal messages size in the single delivery transaction. max_messages_size_in_single_batch: u32, /// Relayer operating mode. - relayer_mode: RelayerMode, + relay_strategy: Strategy, /// Latest confirmed nonces at the source client + the header id where we have first met this /// nonce. latest_confirmed_nonces_at_source: VecDeque<(SourceHeaderIdOf

, MessageNonce)>, @@ -270,7 +266,9 @@ type MessageDeliveryStrategyBase

= BasicStrategy<

::MessagesProof, >; -impl std::fmt::Debug for MessageDeliveryStrategy { +impl std::fmt::Debug + for MessageDeliveryStrategy +{ fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { fmt.debug_struct("MessageDeliveryStrategy") .field( @@ -288,7 +286,7 @@ impl std::fmt::Debug for MessageDeliveryStrategy MessageDeliveryStrategy { +impl MessageDeliveryStrategy { /// Returns total weight of all undelivered messages. fn total_queued_dispatch_weight(&self) -> Weight { self.strategy @@ -300,8 +298,9 @@ impl MessageDeliveryStrategy { } #[async_trait] -impl RaceStrategy, TargetHeaderIdOf

, P::MessagesProof> - for MessageDeliveryStrategy +impl + RaceStrategy, TargetHeaderIdOf

, P::MessagesProof> + for MessageDeliveryStrategy where P: MessageLane, SC: MessageLaneSourceClient

, @@ -504,7 +503,6 @@ where let max_nonces = std::cmp::min(max_nonces, self.max_messages_in_single_batch); let max_messages_weight_in_single_batch = self.max_messages_weight_in_single_batch; let max_messages_size_in_single_batch = self.max_messages_size_in_single_batch; - let relayer_mode = self.relayer_mode; let lane_source_client = self.lane_source_client.clone(); let lane_target_client = self.lane_target_client.clone(); @@ -512,17 +510,19 @@ where self.strategy.maximal_available_source_queue_index(race_state)?; let previous_total_dispatch_weight = self.total_queued_dispatch_weight(); let source_queue = self.strategy.source_queue(); - let range_end = select_nonces_for_delivery_transaction( - relayer_mode, - max_nonces, + + let reference = RelayMessagesBatchReference { + max_messages_in_this_batch: max_nonces, max_messages_weight_in_single_batch, max_messages_size_in_single_batch, - lane_source_client.clone(), - lane_target_client.clone(), - source_queue, - 0..maximal_source_queue_index + 1, - ) - .await?; + lane_source_client: lane_source_client.clone(), + lane_target_client: lane_target_client.clone(), + nonces_queue: source_queue.clone(), + nonces_queue_range: 0..maximal_source_queue_index + 1, + }; + + let strategy = EnforcementStrategy::new(self.relay_strategy.clone()); + let range_end = strategy.decide(reference).await?; let range_begin = source_queue[0].1.begin(); let selected_nonces = range_begin..=range_end; @@ -538,236 +538,6 @@ where } } -/// From given set of source nonces, that are ready to be delivered, select nonces -/// to fit into single delivery transaction. -/// -/// The function returns last nonce that must be delivered to the target chain. -#[allow(clippy::too_many_arguments)] -async fn select_nonces_for_delivery_transaction( - relayer_mode: RelayerMode, - max_messages_in_this_batch: MessageNonce, - max_messages_weight_in_single_batch: Weight, - max_messages_size_in_single_batch: u32, - lane_source_client: impl MessageLaneSourceClient

, - lane_target_client: impl MessageLaneTargetClient

, - nonces_queue: &SourceRangesQueue< - P::SourceHeaderHash, - P::SourceHeaderNumber, - MessageDetailsMap, - >, - nonces_queue_range: Range, -) -> Option { - let mut hard_selected_count = 0; - let mut soft_selected_count = 0; - - let mut selected_weight: Weight = 0; - let mut selected_unpaid_weight: Weight = 0; - let mut selected_prepaid_nonces = 0; - let mut selected_size: u32 = 0; - let mut selected_count: MessageNonce = 0; - let mut selected_reward = P::SourceChainBalance::zero(); - let mut selected_cost = P::SourceChainBalance::zero(); - - let mut total_reward = P::SourceChainBalance::zero(); - let mut total_confirmations_cost = P::SourceChainBalance::zero(); - let mut total_cost = P::SourceChainBalance::zero(); - - let hard_selected_begin_nonce = nonces_queue[nonces_queue_range.start].1.begin(); - - // technically, multiple confirmations will be delivered in a single transaction, - // meaning less loses for relayer. But here we don't know the final relayer yet, so - // we're adding a separate transaction for every message. Normally, this cost is covered - // by the message sender. Probably reconsider this? - let confirmation_transaction_cost = if relayer_mode != RelayerMode::Altruistic { - lane_source_client.estimate_confirmation_transaction().await - } else { - Zero::zero() - }; - - let all_ready_nonces = nonces_queue - .range(nonces_queue_range.clone()) - .flat_map(|(_, ready_nonces)| ready_nonces.iter()) - .enumerate(); - for (index, (nonce, details)) in all_ready_nonces { - // Since we (hopefully) have some reserves in `max_messages_weight_in_single_batch` - // and `max_messages_size_in_single_batch`, we may still try to submit transaction - // with single message if message overflows these limits. The worst case would be if - // transaction will be rejected by the target runtime, but at least we have tried. - - // limit messages in the batch by weight - let new_selected_weight = match selected_weight.checked_add(details.dispatch_weight) { - Some(new_selected_weight) - if new_selected_weight <= max_messages_weight_in_single_batch => - new_selected_weight, - new_selected_weight if selected_count == 0 => { - log::warn!( - target: "bridge", - "Going to submit message delivery transaction with declared dispatch \ - weight {:?} that overflows maximal configured weight {}", - new_selected_weight, - max_messages_weight_in_single_batch, - ); - new_selected_weight.unwrap_or(Weight::MAX) - }, - _ => break, - }; - - // limit messages in the batch by size - let new_selected_size = match selected_size.checked_add(details.size) { - Some(new_selected_size) if new_selected_size <= max_messages_size_in_single_batch => - new_selected_size, - new_selected_size if selected_count == 0 => { - log::warn!( - target: "bridge", - "Going to submit message delivery transaction with message \ - size {:?} that overflows maximal configured size {}", - new_selected_size, - max_messages_size_in_single_batch, - ); - new_selected_size.unwrap_or(u32::MAX) - }, - _ => break, - }; - - // limit number of messages in the batch - let new_selected_count = selected_count + 1; - if new_selected_count > max_messages_in_this_batch { - break - } - - // If dispatch fee has been paid at the source chain, it means that it is **relayer** who's - // paying for dispatch at the target chain AND reward must cover this dispatch fee. - // - // If dispatch fee is paid at the target chain, it means that it'll be withdrawn from the - // dispatch origin account AND reward is not covering this fee. - // - // So in the latter case we're not adding the dispatch weight to the delivery transaction - // weight. - let mut new_selected_prepaid_nonces = selected_prepaid_nonces; - let new_selected_unpaid_weight = match details.dispatch_fee_payment { - DispatchFeePayment::AtSourceChain => { - new_selected_prepaid_nonces += 1; - selected_unpaid_weight.saturating_add(details.dispatch_weight) - }, - DispatchFeePayment::AtTargetChain => selected_unpaid_weight, - }; - - // now the message has passed all 'strong' checks, and we CAN deliver it. But do we WANT - // to deliver it? It depends on the relayer strategy. - match relayer_mode { - RelayerMode::Altruistic => { - soft_selected_count = index + 1; - }, - RelayerMode::Rational => { - let delivery_transaction_cost = lane_target_client - .estimate_delivery_transaction_in_source_tokens( - hard_selected_begin_nonce..= - (hard_selected_begin_nonce + index as MessageNonce), - new_selected_prepaid_nonces, - new_selected_unpaid_weight, - new_selected_size as u32, - ) - .await - .map_err(|err| { - log::debug!( - target: "bridge", - "Failed to estimate delivery transaction cost: {:?}. No nonces selected for delivery", - err, - ); - }) - .ok()?; - - // if it is the first message that makes reward less than cost, let's log it - // if this message makes batch profitable again, let's log it - let is_total_reward_less_than_cost = total_reward < total_cost; - let prev_total_cost = total_cost; - let prev_total_reward = total_reward; - total_confirmations_cost = - total_confirmations_cost.saturating_add(&confirmation_transaction_cost); - total_reward = total_reward.saturating_add(&details.reward); - total_cost = total_confirmations_cost.saturating_add(&delivery_transaction_cost); - if !is_total_reward_less_than_cost && total_reward < total_cost { - log::debug!( - target: "bridge", - "Message with nonce {} (reward = {:?}) changes total cost {:?}->{:?} and makes it larger than \ - total reward {:?}->{:?}", - nonce, - details.reward, - prev_total_cost, - total_cost, - prev_total_reward, - total_reward, - ); - } else if is_total_reward_less_than_cost && total_reward >= total_cost { - log::debug!( - target: "bridge", - "Message with nonce {} (reward = {:?}) changes total cost {:?}->{:?} and makes it less than or \ - equal to the total reward {:?}->{:?} (again)", - nonce, - details.reward, - prev_total_cost, - total_cost, - prev_total_reward, - total_reward, - ); - } - - // Rational relayer never want to lose his funds - if total_reward >= total_cost { - soft_selected_count = index + 1; - selected_reward = total_reward; - selected_cost = total_cost; - } - }, - } - - hard_selected_count = index + 1; - selected_weight = new_selected_weight; - selected_unpaid_weight = new_selected_unpaid_weight; - selected_prepaid_nonces = new_selected_prepaid_nonces; - selected_size = new_selected_size; - selected_count = new_selected_count; - } - - if hard_selected_count != soft_selected_count { - let hard_selected_end_nonce = - hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1; - let soft_selected_begin_nonce = hard_selected_begin_nonce; - let soft_selected_end_nonce = - soft_selected_begin_nonce + soft_selected_count as MessageNonce - 1; - log::warn!( - target: "bridge", - "Relayer may deliver nonces [{:?}; {:?}], but because of its strategy ({:?}) it has selected \ - nonces [{:?}; {:?}].", - hard_selected_begin_nonce, - hard_selected_end_nonce, - relayer_mode, - soft_selected_begin_nonce, - soft_selected_end_nonce, - ); - - hard_selected_count = soft_selected_count; - } - - if hard_selected_count != 0 { - if relayer_mode != RelayerMode::Altruistic { - log::trace!( - target: "bridge", - "Expected reward from delivering nonces [{:?}; {:?}] is: {:?} - {:?} = {:?}", - hard_selected_begin_nonce, - hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1, - selected_reward, - selected_cost, - selected_reward - selected_cost, - ); - } - - Some(hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1) - } else { - None - } -} - impl NoncesRange for MessageDetailsMap { fn begin(&self) -> MessageNonce { self.keys().next().cloned().unwrap_or_default() @@ -789,16 +559,21 @@ impl NoncesRange for MessageDetailsMap; type TestStrategy = - MessageDeliveryStrategy; + MessageDeliveryStrategy; fn source_nonces( new_nonces: RangeInclusive, @@ -848,7 +623,6 @@ mod tests { }; let mut race_strategy = TestStrategy { - relayer_mode: RelayerMode::Altruistic, max_unrewarded_relayer_entries_at_target: 4, max_unconfirmed_nonces_at_target: 4, max_messages_in_single_batch: 4, @@ -869,11 +643,12 @@ mod tests { }, }), strategy: BasicStrategy::new(), + relay_strategy: MixStrategy::new(RelayerMode::Altruistic), }; race_strategy.strategy.source_nonces_updated( header_id(1), - source_nonces(20..=23, 19, DEFAULT_REWARD, AtSourceChain), + source_nonces(20..=23, 19, DEFAULT_REWARD, DispatchFeePayment::AtSourceChain), ); let target_nonces = TargetClientNonces { latest_nonce: 19, nonces_data: () }; @@ -907,7 +682,7 @@ mod tests { dispatch_weight: idx, size: idx as _, reward: idx as _, - dispatch_fee_payment: AtSourceChain, + dispatch_fee_payment: DispatchFeePayment::AtSourceChain, }, ) }) @@ -1199,7 +974,7 @@ mod tests { #[async_std::test] async fn rational_relayer_is_delivering_messages_if_cost_is_equal_to_reward() { let (state, mut strategy) = prepare_strategy(); - strategy.relayer_mode = RelayerMode::Rational; + strategy.relay_strategy = MixStrategy::new(RelayerMode::Rational); // so now we have: // - 20..=23 with reward = cost @@ -1217,11 +992,11 @@ mod tests { 24..=25, 19, DEFAULT_REWARD - BASE_MESSAGE_DELIVERY_TRANSACTION_COST, - AtSourceChain, + DispatchFeePayment::AtSourceChain, ); strategy.strategy.source_nonces_updated(header_id(2), nonces); state.best_finalized_source_header_id_at_best_target = Some(header_id(2)); - strategy.relayer_mode = RelayerMode::Rational; + strategy.relay_strategy = MixStrategy::new(RelayerMode::Rational); // so now we have: // - 20..=23 with reward = cost @@ -1252,7 +1027,7 @@ mod tests { strategy.max_messages_in_single_batch = 100; strategy.max_messages_weight_in_single_batch = 100; strategy.max_messages_size_in_single_batch = 100; - strategy.relayer_mode = RelayerMode::Rational; + strategy.relay_strategy = MixStrategy::new(RelayerMode::Rational); // so now we have: // - 20..=23 with reward = cost @@ -1264,11 +1039,11 @@ mod tests { } assert_eq!( - test_with_dispatch_fee_payment(AtTargetChain).await, + test_with_dispatch_fee_payment(DispatchFeePayment::AtTargetChain).await, Some(((20..=24), proof_parameters(false, 5))) ); assert_eq!( - test_with_dispatch_fee_payment(AtSourceChain).await, + test_with_dispatch_fee_payment(DispatchFeePayment::AtSourceChain).await, Some(((20..=23), proof_parameters(false, 4))) ); } @@ -1284,7 +1059,7 @@ mod tests { // This was happening because selector (`select_nonces_for_delivery_transaction`) has been // called for every `source_queue` entry separately without preserving any context. let (mut state, mut strategy) = prepare_strategy(); - let nonces = source_nonces(24..=25, 19, DEFAULT_REWARD, AtSourceChain); + let nonces = source_nonces(24..=25, 19, DEFAULT_REWARD, DispatchFeePayment::AtSourceChain); strategy.strategy.source_nonces_updated(header_id(2), nonces); strategy.max_unrewarded_relayer_entries_at_target = 100; strategy.max_unconfirmed_nonces_at_target = 100; diff --git a/bridges/relays/messages/src/relay_strategy/altruistic_strategy.rs b/bridges/relays/messages/src/relay_strategy/altruistic_strategy.rs new file mode 100644 index 0000000000000..f932b796b0dee --- /dev/null +++ b/bridges/relays/messages/src/relay_strategy/altruistic_strategy.rs @@ -0,0 +1,45 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Altruistic relay strategy + +use async_trait::async_trait; + +use crate::{ + message_lane::MessageLane, + message_lane_loop::{ + SourceClient as MessageLaneSourceClient, TargetClient as MessageLaneTargetClient, + }, + relay_strategy::{RelayReference, RelayStrategy}, +}; + +/// The relayer doesn't care about rewards. +#[derive(Clone)] +pub struct AltruisticStrategy; + +#[async_trait] +impl RelayStrategy for AltruisticStrategy { + async fn decide< + P: MessageLane, + SourceClient: MessageLaneSourceClient

, + TargetClient: MessageLaneTargetClient

, + >( + &self, + _reference: &mut RelayReference, + ) -> bool { + true + } +} diff --git a/bridges/relays/messages/src/relay_strategy/enforcement_strategy.rs b/bridges/relays/messages/src/relay_strategy/enforcement_strategy.rs new file mode 100644 index 0000000000000..042c05bec00ad --- /dev/null +++ b/bridges/relays/messages/src/relay_strategy/enforcement_strategy.rs @@ -0,0 +1,219 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! enforcement strategy + +use num_traits::Zero; + +use bp_messages::{MessageNonce, Weight}; +use bp_runtime::messages::DispatchFeePayment; + +use crate::{ + message_lane::MessageLane, + message_lane_loop::{ + MessageDetails, SourceClient as MessageLaneSourceClient, + TargetClient as MessageLaneTargetClient, + }, + message_race_loop::NoncesRange, + relay_strategy::{RelayMessagesBatchReference, RelayReference, RelayStrategy}, +}; + +/// Do hard check and run soft check strategy +#[derive(Clone)] +pub struct EnforcementStrategy { + strategy: Strategy, +} + +impl EnforcementStrategy { + pub fn new(strategy: Strategy) -> Self { + Self { strategy } + } +} + +impl EnforcementStrategy { + pub async fn decide< + P: MessageLane, + SourceClient: MessageLaneSourceClient

, + TargetClient: MessageLaneTargetClient

, + >( + &self, + reference: RelayMessagesBatchReference, + ) -> Option { + let mut hard_selected_count = 0; + let mut soft_selected_count = 0; + + let mut selected_weight: Weight = 0; + let mut selected_count: MessageNonce = 0; + + let hard_selected_begin_nonce = + reference.nonces_queue[reference.nonces_queue_range.start].1.begin(); + + // relay reference + let mut relay_reference = RelayReference { + lane_source_client: reference.lane_source_client.clone(), + lane_target_client: reference.lane_target_client.clone(), + + selected_reward: P::SourceChainBalance::zero(), + selected_cost: P::SourceChainBalance::zero(), + selected_size: 0, + + total_reward: P::SourceChainBalance::zero(), + total_confirmations_cost: P::SourceChainBalance::zero(), + total_cost: P::SourceChainBalance::zero(), + + hard_selected_begin_nonce, + selected_prepaid_nonces: 0, + selected_unpaid_weight: 0, + + index: 0, + nonce: 0, + details: MessageDetails { + dispatch_weight: 0, + size: 0, + reward: P::SourceChainBalance::zero(), + dispatch_fee_payment: DispatchFeePayment::AtSourceChain, + }, + }; + + let all_ready_nonces = reference + .nonces_queue + .range(reference.nonces_queue_range.clone()) + .flat_map(|(_, ready_nonces)| ready_nonces.iter()) + .enumerate(); + for (index, (nonce, details)) in all_ready_nonces { + relay_reference.index = index; + relay_reference.nonce = *nonce; + relay_reference.details = *details; + + // Since we (hopefully) have some reserves in `max_messages_weight_in_single_batch` + // and `max_messages_size_in_single_batch`, we may still try to submit transaction + // with single message if message overflows these limits. The worst case would be if + // transaction will be rejected by the target runtime, but at least we have tried. + + // limit messages in the batch by weight + let new_selected_weight = match selected_weight.checked_add(details.dispatch_weight) { + Some(new_selected_weight) + if new_selected_weight <= reference.max_messages_weight_in_single_batch => + new_selected_weight, + new_selected_weight if selected_count == 0 => { + log::warn!( + target: "bridge", + "Going to submit message delivery transaction with declared dispatch \ + weight {:?} that overflows maximal configured weight {}", + new_selected_weight, + reference.max_messages_weight_in_single_batch, + ); + new_selected_weight.unwrap_or(Weight::MAX) + }, + _ => break, + }; + + // limit messages in the batch by size + let new_selected_size = match relay_reference.selected_size.checked_add(details.size) { + Some(new_selected_size) + if new_selected_size <= reference.max_messages_size_in_single_batch => + new_selected_size, + new_selected_size if selected_count == 0 => { + log::warn!( + target: "bridge", + "Going to submit message delivery transaction with message \ + size {:?} that overflows maximal configured size {}", + new_selected_size, + reference.max_messages_size_in_single_batch, + ); + new_selected_size.unwrap_or(u32::MAX) + }, + _ => break, + }; + + // limit number of messages in the batch + let new_selected_count = selected_count + 1; + if new_selected_count > reference.max_messages_in_this_batch { + break + } + relay_reference.selected_size = new_selected_size; + + // If dispatch fee has been paid at the source chain, it means that it is **relayer** + // who's paying for dispatch at the target chain AND reward must cover this dispatch + // fee. + // + // If dispatch fee is paid at the target chain, it means that it'll be withdrawn from + // the dispatch origin account AND reward is not covering this fee. + // + // So in the latter case we're not adding the dispatch weight to the delivery + // transaction weight. + let mut new_selected_prepaid_nonces = relay_reference.selected_prepaid_nonces; + let new_selected_unpaid_weight = match details.dispatch_fee_payment { + DispatchFeePayment::AtSourceChain => { + new_selected_prepaid_nonces += 1; + relay_reference.selected_unpaid_weight.saturating_add(details.dispatch_weight) + }, + DispatchFeePayment::AtTargetChain => relay_reference.selected_unpaid_weight, + }; + relay_reference.selected_prepaid_nonces = new_selected_prepaid_nonces; + relay_reference.selected_unpaid_weight = new_selected_unpaid_weight; + + // now the message has passed all 'strong' checks, and we CAN deliver it. But do we WANT + // to deliver it? It depends on the relayer strategy. + if self.strategy.decide(&mut relay_reference).await { + soft_selected_count = index + 1; + } + + hard_selected_count = index + 1; + selected_weight = new_selected_weight; + selected_count = new_selected_count; + } + + if hard_selected_count != soft_selected_count { + let hard_selected_end_nonce = + hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1; + let soft_selected_begin_nonce = hard_selected_begin_nonce; + let soft_selected_end_nonce = + soft_selected_begin_nonce + soft_selected_count as MessageNonce - 1; + log::warn!( + target: "bridge", + "Relayer may deliver nonces [{:?}; {:?}], but because of its strategy it has selected \ + nonces [{:?}; {:?}].", + hard_selected_begin_nonce, + hard_selected_end_nonce, + soft_selected_begin_nonce, + soft_selected_end_nonce, + ); + + hard_selected_count = soft_selected_count; + } + + if hard_selected_count != 0 { + if relay_reference.selected_reward != P::SourceChainBalance::zero() && + relay_reference.selected_cost != P::SourceChainBalance::zero() + { + log::trace!( + target: "bridge", + "Expected reward from delivering nonces [{:?}; {:?}] is: {:?} - {:?} = {:?}", + hard_selected_begin_nonce, + hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1, + &relay_reference.selected_reward, + &relay_reference.selected_cost, + relay_reference.selected_reward - relay_reference.selected_cost, + ); + } + + Some(hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1) + } else { + None + } + } +} diff --git a/bridges/relays/messages/src/relay_strategy/mix_strategy.rs b/bridges/relays/messages/src/relay_strategy/mix_strategy.rs new file mode 100644 index 0000000000000..ef472e9c35052 --- /dev/null +++ b/bridges/relays/messages/src/relay_strategy/mix_strategy.rs @@ -0,0 +1,40 @@ +use async_trait::async_trait; + +use crate::{ + message_lane::MessageLane, + message_lane_loop::{ + RelayerMode, SourceClient as MessageLaneSourceClient, + TargetClient as MessageLaneTargetClient, + }, + relay_strategy::{AltruisticStrategy, RationalStrategy, RelayReference, RelayStrategy}, +}; + +/// The relayer doesn't care about rewards. +#[derive(Clone)] +pub struct MixStrategy { + relayer_mode: RelayerMode, +} + +impl MixStrategy { + /// Create mix strategy instance + pub fn new(relayer_mode: RelayerMode) -> Self { + Self { relayer_mode } + } +} + +#[async_trait] +impl RelayStrategy for MixStrategy { + async fn decide< + P: MessageLane, + SourceClient: MessageLaneSourceClient

, + TargetClient: MessageLaneTargetClient

, + >( + &self, + reference: &mut RelayReference, + ) -> bool { + match self.relayer_mode { + RelayerMode::Altruistic => AltruisticStrategy.decide(reference).await, + RelayerMode::Rational => RationalStrategy.decide(reference).await, + } + } +} diff --git a/bridges/relays/messages/src/relay_strategy/mod.rs b/bridges/relays/messages/src/relay_strategy/mod.rs new file mode 100644 index 0000000000000..3e4eef8975dd8 --- /dev/null +++ b/bridges/relays/messages/src/relay_strategy/mod.rs @@ -0,0 +1,123 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Relayer strategy + +use std::ops::Range; + +use async_trait::async_trait; + +use bp_messages::{MessageNonce, Weight}; + +use crate::{ + message_lane::MessageLane, + message_lane_loop::{ + MessageDetails, MessageDetailsMap, SourceClient as MessageLaneSourceClient, + TargetClient as MessageLaneTargetClient, + }, + message_race_strategy::SourceRangesQueue, +}; + +pub(crate) use self::enforcement_strategy::*; +pub use self::{altruistic_strategy::*, mix_strategy::*, rational_strategy::*}; + +mod altruistic_strategy; +mod enforcement_strategy; +mod mix_strategy; +mod rational_strategy; + +/// Relayer strategy trait +#[async_trait] +pub trait RelayStrategy: 'static + Clone + Send + Sync { + /// The relayer decide how to process nonce by reference. + /// From given set of source nonces, that are ready to be delivered, select nonces + /// to fit into single delivery transaction. + /// + /// The function returns last nonce that must be delivered to the target chain. + async fn decide< + P: MessageLane, + SourceClient: MessageLaneSourceClient

, + TargetClient: MessageLaneTargetClient

, + >( + &self, + reference: &mut RelayReference, + ) -> bool; +} + +/// Reference data for participating in relay +pub struct RelayReference< + P: MessageLane, + SourceClient: MessageLaneSourceClient

, + TargetClient: MessageLaneTargetClient

, +> { + /// The client that is connected to the message lane source node. + pub lane_source_client: SourceClient, + /// The client that is connected to the message lane target node. + pub lane_target_client: TargetClient, + /// Current block reward summary + pub selected_reward: P::SourceChainBalance, + /// Current block cost summary + pub selected_cost: P::SourceChainBalance, + /// Messages size summary + pub selected_size: u32, + + /// Current block reward summary + pub total_reward: P::SourceChainBalance, + /// All confirmations cost + pub total_confirmations_cost: P::SourceChainBalance, + /// Current block cost summary + pub total_cost: P::SourceChainBalance, + + /// Hard check begin nonce + pub hard_selected_begin_nonce: MessageNonce, + /// Count prepaid nonces + pub selected_prepaid_nonces: MessageNonce, + /// Unpaid nonces weight summary + pub selected_unpaid_weight: Weight, + + /// Index by all ready nonces + pub index: usize, + /// Current nonce + pub nonce: MessageNonce, + /// Current nonce details + pub details: MessageDetails, +} + +/// Relay reference data +pub struct RelayMessagesBatchReference< + P: MessageLane, + SourceClient: MessageLaneSourceClient

, + TargetClient: MessageLaneTargetClient

, +> { + /// Maximal number of relayed messages in single delivery transaction. + pub max_messages_in_this_batch: MessageNonce, + /// Maximal cumulative dispatch weight of relayed messages in single delivery transaction. + pub max_messages_weight_in_single_batch: Weight, + /// Maximal cumulative size of relayed messages in single delivery transaction. + pub max_messages_size_in_single_batch: u32, + /// The client that is connected to the message lane source node. + pub lane_source_client: SourceClient, + /// The client that is connected to the message lane target node. + pub lane_target_client: TargetClient, + /// Source queue. + pub nonces_queue: SourceRangesQueue< + P::SourceHeaderHash, + P::SourceHeaderNumber, + MessageDetailsMap, + >, + /// Source queue range + pub nonces_queue_range: Range, +} diff --git a/bridges/relays/messages/src/relay_strategy/rational_strategy.rs b/bridges/relays/messages/src/relay_strategy/rational_strategy.rs new file mode 100644 index 0000000000000..dc408ffd49e25 --- /dev/null +++ b/bridges/relays/messages/src/relay_strategy/rational_strategy.rs @@ -0,0 +1,122 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Rational relay strategy + +use async_trait::async_trait; +use num_traits::SaturatingAdd; + +use bp_messages::MessageNonce; + +use crate::{ + message_lane::MessageLane, + message_lane_loop::{ + SourceClient as MessageLaneSourceClient, TargetClient as MessageLaneTargetClient, + }, + relay_strategy::{RelayReference, RelayStrategy}, +}; + +/// The relayer will deliver all messages and confirmations as long as he's not losing any +/// funds. +#[derive(Clone)] +pub struct RationalStrategy; + +#[async_trait] +impl RelayStrategy for RationalStrategy { + async fn decide< + P: MessageLane, + SourceClient: MessageLaneSourceClient

, + TargetClient: MessageLaneTargetClient

, + >( + &self, + reference: &mut RelayReference, + ) -> bool { + // technically, multiple confirmations will be delivered in a single transaction, + // meaning less loses for relayer. But here we don't know the final relayer yet, so + // we're adding a separate transaction for every message. Normally, this cost is covered + // by the message sender. Probably reconsider this? + let confirmation_transaction_cost = + reference.lane_source_client.estimate_confirmation_transaction().await; + + let delivery_transaction_cost = match reference + .lane_target_client + .estimate_delivery_transaction_in_source_tokens( + reference.hard_selected_begin_nonce..= + (reference.hard_selected_begin_nonce + reference.index as MessageNonce), + reference.selected_prepaid_nonces, + reference.selected_unpaid_weight, + reference.selected_size as u32, + ) + .await + { + Ok(v) => v, + Err(err) => { + log::debug!( + target: "bridge", + "Failed to estimate delivery transaction cost: {:?}. No nonces selected for delivery", + err, + ); + return false + }, + }; + + // if it is the first message that makes reward less than cost, let's log it + // if this message makes batch profitable again, let's log it + let is_total_reward_less_than_cost = reference.total_reward < reference.total_cost; + let prev_total_cost = reference.total_cost; + let prev_total_reward = reference.total_reward; + reference.total_confirmations_cost = reference + .total_confirmations_cost + .saturating_add(&confirmation_transaction_cost); + reference.total_reward = reference.total_reward.saturating_add(&reference.details.reward); + reference.total_cost = + reference.total_confirmations_cost.saturating_add(&delivery_transaction_cost); + if !is_total_reward_less_than_cost && reference.total_reward < reference.total_cost { + log::debug!( + target: "bridge", + "Message with nonce {} (reward = {:?}) changes total cost {:?}->{:?} and makes it larger than \ + total reward {:?}->{:?}", + reference.nonce, + reference.details.reward, + prev_total_cost, + reference.total_cost, + prev_total_reward, + reference.total_reward, + ); + } else if is_total_reward_less_than_cost && reference.total_reward >= reference.total_cost { + log::debug!( + target: "bridge", + "Message with nonce {} (reward = {:?}) changes total cost {:?}->{:?} and makes it less than or \ + equal to the total reward {:?}->{:?} (again)", + reference.nonce, + reference.details.reward, + prev_total_cost, + reference.total_cost, + prev_total_reward, + reference.total_reward, + ); + } + + // Rational relayer never want to lose his funds + if reference.total_reward >= reference.total_cost { + reference.selected_reward = reference.total_reward; + reference.selected_cost = reference.total_cost; + return true + } + + false + } +}