From 3984e67d08ace8fac2c96e46be40e863221d2f81 Mon Sep 17 00:00:00 2001 From: vedhavyas Date: Wed, 27 Nov 2024 16:16:23 +0530 Subject: [PATCH 1/7] add runtime api to extract xdm message id --- crates/subspace-fake-runtime-api/src/lib.rs | 5 +++++ crates/subspace-runtime/src/lib.rs | 13 +++++++++++++ .../src/message_listener.rs | 13 ++++++++++++- domains/primitives/messenger/src/lib.rs | 11 +++++++++++ domains/runtime/auto-id/src/lib.rs | 13 +++++++++++++ domains/runtime/evm/src/lib.rs | 13 +++++++++++++ domains/test/runtime/auto-id/src/lib.rs | 13 +++++++++++++ domains/test/runtime/evm/src/lib.rs | 13 +++++++++++++ test/subspace-test-runtime/src/lib.rs | 13 +++++++++++++ 9 files changed, 106 insertions(+), 1 deletion(-) diff --git a/crates/subspace-fake-runtime-api/src/lib.rs b/crates/subspace-fake-runtime-api/src/lib.rs index 9ca477d0ed..05539ce2fb 100644 --- a/crates/subspace-fake-runtime-api/src/lib.rs +++ b/crates/subspace-fake-runtime-api/src/lib.rs @@ -32,6 +32,7 @@ use sp_domains_fraud_proof::storage_proof::FraudProofStorageKeyRequest; use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, ChannelId, CrossDomainMessage, MessageId, MessageKey, }; +use sp_messenger::XdmId; use sp_runtime::traits::{Block as BlockT, NumberFor}; use sp_runtime::transaction_validity::{TransactionSource, TransactionValidity}; use sp_runtime::{ApplyExtrinsicResult, ExtrinsicInclusionMode}; @@ -361,6 +362,10 @@ sp_api::impl_runtime_apis! { fn domain_chains_allowlist_update(_domain_id: DomainId) -> Option{ unreachable!() } + + fn xdm_id(_ext: &::Extrinsic) -> Option { + unreachable!() + } } impl sp_messenger::RelayerApi::Hash> for Runtime { diff --git a/crates/subspace-runtime/src/lib.rs b/crates/subspace-runtime/src/lib.rs index d145592342..3654183d44 100644 --- a/crates/subspace-runtime/src/lib.rs +++ b/crates/subspace-runtime/src/lib.rs @@ -83,6 +83,7 @@ use sp_messenger::endpoint::{Endpoint, EndpointHandler as EndpointHandlerT, Endp use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; +use sp_messenger::XdmId; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::traits::{ @@ -1442,6 +1443,18 @@ impl_runtime_apis! { fn domain_chains_allowlist_update(domain_id: DomainId) -> Option{ Messenger::domain_chains_allowlist_update(domain_id) } + + fn xdm_id(ext: &::Extrinsic) -> Option { + match &ext.function { + RuntimeCall::Messenger(pallet_messenger::Call::relay_message { msg })=> { + Some(XdmId::RelayMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + RuntimeCall::Messenger(pallet_messenger::Call::relay_message_response { msg }) => { + Some(XdmId::RelayResponseMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + _ => None, + } + } } impl sp_messenger::RelayerApi::Hash> for Runtime { diff --git a/domains/client/cross-domain-message-gossip/src/message_listener.rs b/domains/client/cross-domain-message-gossip/src/message_listener.rs index e0648fc65f..07f52ecb4b 100644 --- a/domains/client/cross-domain-message-gossip/src/message_listener.rs +++ b/domains/client/cross-domain-message-gossip/src/message_listener.rs @@ -470,9 +470,12 @@ async fn handle_xdm_message( Client: HeaderBackend>, { let at = client.info().best_hash; + let tx_hash = tx_pool.hash_of(&ext); tracing::debug!( target: LOG_TARGET, - "Submitting extrinsic to tx pool at block: {:?}", + "Submitting extrinsic {:?} to tx pool for chain {:?} at block: {:?}", + tx_hash, + chain_id, at ); @@ -487,5 +490,13 @@ async fn handle_xdm_message( chain_id, err ); + } else { + tracing::debug!( + target: LOG_TARGET, + "Submitted extrinsic {:?} to tx pool for chain {:?} at {:?}", + tx_hash, + chain_id, + at + ) } } diff --git a/domains/primitives/messenger/src/lib.rs b/domains/primitives/messenger/src/lib.rs index 8f26eab00b..67a2cf4282 100644 --- a/domains/primitives/messenger/src/lib.rs +++ b/domains/primitives/messenger/src/lib.rs @@ -33,6 +33,7 @@ use codec::{Decode, Encode}; use frame_support::inherent::InherentData; use frame_support::inherent::{InherentIdentifier, IsFatalError}; use messages::{BlockMessagesWithStorageKey, ChannelId, CrossDomainMessage, MessageId}; +use scale_info::TypeInfo; use sp_domains::{ChainId, DomainAllowlistUpdates, DomainId}; use sp_subspace_mmr::ConsensusChainMmrLeafProof; #[cfg(feature = "std")] @@ -159,6 +160,13 @@ impl sp_inherents::InherentDataProvider for InherentDataProvider { } } +/// Represent a union of XDM types with their message ID +#[derive(Debug, Encode, Decode, TypeInfo)] +pub enum XdmId { + RelayMessage(MessageKey), + RelayResponseMessage(MessageKey), +} + sp_api::decl_runtime_apis! { /// Api useful for relayers to fetch messages and submit transactions. pub trait RelayerApi @@ -220,5 +228,8 @@ sp_api::decl_runtime_apis! { /// Returns any domain's chains allowlist updates on consensus chain. fn domain_chains_allowlist_update(domain_id: DomainId) -> Option; + + /// Returns XDM message ID + fn xdm_id(ext: &Block::Extrinsic) -> Option; } } diff --git a/domains/runtime/auto-id/src/lib.rs b/domains/runtime/auto-id/src/lib.rs index 61ddda8c3a..b1c3abe560 100644 --- a/domains/runtime/auto-id/src/lib.rs +++ b/domains/runtime/auto-id/src/lib.rs @@ -45,6 +45,7 @@ use sp_messenger::endpoint::{Endpoint, EndpointHandler as EndpointHandlerT, Endp use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; +use sp_messenger::XdmId; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::generic::Era; @@ -909,6 +910,18 @@ impl_runtime_apis! { // not valid call on domains None } + + fn xdm_id(ext: &::Extrinsic) -> Option { + match &ext.function { + RuntimeCall::Messenger(pallet_messenger::Call::relay_message { msg })=> { + Some(XdmId::RelayMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + RuntimeCall::Messenger(pallet_messenger::Call::relay_message_response { msg }) => { + Some(XdmId::RelayResponseMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + _ => None, + } + } } impl sp_messenger::RelayerApi for Runtime { diff --git a/domains/runtime/evm/src/lib.rs b/domains/runtime/evm/src/lib.rs index 172e0c3b01..91a4f9ec00 100644 --- a/domains/runtime/evm/src/lib.rs +++ b/domains/runtime/evm/src/lib.rs @@ -58,6 +58,7 @@ use sp_messenger::endpoint::{Endpoint, EndpointHandler as EndpointHandlerT, Endp use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; +use sp_messenger::XdmId; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::generic::Era; @@ -1320,6 +1321,18 @@ impl_runtime_apis! { // not valid call on domains None } + + fn xdm_id(ext: &::Extrinsic) -> Option { + match &ext.0.function { + RuntimeCall::Messenger(pallet_messenger::Call::relay_message { msg })=> { + Some(XdmId::RelayMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + RuntimeCall::Messenger(pallet_messenger::Call::relay_message_response { msg }) => { + Some(XdmId::RelayResponseMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + _ => None, + } + } } impl sp_messenger::RelayerApi for Runtime { diff --git a/domains/test/runtime/auto-id/src/lib.rs b/domains/test/runtime/auto-id/src/lib.rs index 1bfab38c27..59cb9c05d0 100644 --- a/domains/test/runtime/auto-id/src/lib.rs +++ b/domains/test/runtime/auto-id/src/lib.rs @@ -45,6 +45,7 @@ use sp_messenger::endpoint::{Endpoint, EndpointHandler as EndpointHandlerT, Endp use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; +use sp_messenger::XdmId; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::generic::Era; @@ -900,6 +901,18 @@ impl_runtime_apis! { // not valid call on domains None } + + fn xdm_id(ext: &::Extrinsic) -> Option { + match &ext.function { + RuntimeCall::Messenger(pallet_messenger::Call::relay_message { msg })=> { + Some(XdmId::RelayMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + RuntimeCall::Messenger(pallet_messenger::Call::relay_message_response { msg }) => { + Some(XdmId::RelayResponseMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + _ => None, + } + } } impl sp_messenger::RelayerApi for Runtime { diff --git a/domains/test/runtime/evm/src/lib.rs b/domains/test/runtime/evm/src/lib.rs index b530910fdb..9bd7d22fee 100644 --- a/domains/test/runtime/evm/src/lib.rs +++ b/domains/test/runtime/evm/src/lib.rs @@ -58,6 +58,7 @@ use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, ChannelId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; +use sp_messenger::XdmId; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::generic::Era; @@ -1276,6 +1277,18 @@ impl_runtime_apis! { // not valid call on domains None } + + fn xdm_id(ext: &::Extrinsic) -> Option { + match &ext.0.function { + RuntimeCall::Messenger(pallet_messenger::Call::relay_message { msg })=> { + Some(XdmId::RelayMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + RuntimeCall::Messenger(pallet_messenger::Call::relay_message_response { msg }) => { + Some(XdmId::RelayResponseMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + _ => None, + } + } } impl sp_messenger::RelayerApi for Runtime { diff --git a/test/subspace-test-runtime/src/lib.rs b/test/subspace-test-runtime/src/lib.rs index cd89a54111..271590afae 100644 --- a/test/subspace-test-runtime/src/lib.rs +++ b/test/subspace-test-runtime/src/lib.rs @@ -73,6 +73,7 @@ use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, ChannelId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; +use sp_messenger::XdmId; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::traits::{ @@ -1503,6 +1504,18 @@ impl_runtime_apis! { fn domain_chains_allowlist_update(domain_id: DomainId) -> Option{ Messenger::domain_chains_allowlist_update(domain_id) } + + fn xdm_id(ext: &::Extrinsic) -> Option { + match &ext.function { + RuntimeCall::Messenger(pallet_messenger::Call::relay_message { msg })=> { + Some(XdmId::RelayMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + RuntimeCall::Messenger(pallet_messenger::Call::relay_message_response { msg }) => { + Some(XdmId::RelayResponseMessage((msg.src_chain_id, msg.channel_id, msg.nonce))) + } + _ => None, + } + } } impl sp_messenger::RelayerApi::Hash> for Runtime { From 38e07c55331ee2093da0413ed3fbe3eeaa56c6d6 Mon Sep 17 00:00:00 2001 From: vedhavyas Date: Mon, 2 Dec 2024 21:17:21 +0530 Subject: [PATCH 2/7] submit xdm messages to the txpool with some block interval. Currently the block interval is set to 5 blocks. If XDM is submitted to txpool at block x, we do not submit the same xdm until x+6 --- .../src/bin/subspace-malicious-operator.rs | 1 - crates/subspace-node/src/commands/run.rs | 1 - .../src/aux_schema.rs | 61 ++++++++- .../src/gossip_worker.rs | 4 + .../cross-domain-message-gossip/src/lib.rs | 1 + .../src/message_listener.rs | 129 ++++++++++++++---- domains/primitives/messenger/src/lib.rs | 2 +- domains/service/src/domain.rs | 30 ++-- test/subspace-test-service/src/lib.rs | 30 ++-- 9 files changed, 188 insertions(+), 71 deletions(-) diff --git a/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs b/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs index c84ef91c6d..e45387aaf6 100644 --- a/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs +++ b/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs @@ -355,7 +355,6 @@ fn main() -> Result<(), Error> { _, _, _, - DomainBlock, _, _, >( diff --git a/crates/subspace-node/src/commands/run.rs b/crates/subspace-node/src/commands/run.rs index 2ad0205072..4cc1fe5dbe 100644 --- a/crates/subspace-node/src/commands/run.rs +++ b/crates/subspace-node/src/commands/run.rs @@ -212,7 +212,6 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> { _, _, _, - DomainBlock, _, _, >( diff --git a/domains/client/cross-domain-message-gossip/src/aux_schema.rs b/domains/client/cross-domain-message-gossip/src/aux_schema.rs index 0e83e917b3..90cc993577 100644 --- a/domains/client/cross-domain-message-gossip/src/aux_schema.rs +++ b/domains/client/cross-domain-message-gossip/src/aux_schema.rs @@ -2,9 +2,11 @@ use parity_scale_codec::{Decode, Encode}; use sc_client_api::backend::AuxStore; -use sp_blockchain::{Error as ClientError, Result as ClientResult}; +use sp_blockchain::{Error as ClientError, Info, Result as ClientResult}; use sp_core::H256; use sp_messenger::messages::{ChainId, ChannelId, ChannelState, Nonce}; +use sp_messenger::XdmId; +use sp_runtime::traits::{Block as BlockT, NumberFor}; use subspace_runtime_primitives::BlockNumber; const CHANNEL_DETAIL: &[u8] = b"channel_detail"; @@ -86,3 +88,60 @@ where vec![], ) } + +mod xdm_keys { + use parity_scale_codec::Encode; + use sp_messenger::XdmId; + + const XDM: &[u8] = b"xdm"; + const XDM_RELAY: &[u8] = b"relay_msg"; + const XDM_RELAY_RESPONSE: &[u8] = b"relay_msg_response"; + + pub(super) fn get_key_for_xdm_id(xdm_id: XdmId) -> Vec { + match xdm_id { + XdmId::RelayMessage(id) => (XDM, XDM_RELAY, id).encode(), + XdmId::RelayResponseMessage(id) => (XDM, XDM_RELAY_RESPONSE, id).encode(), + } + } +} + +#[derive(Debug, Encode, Decode, Clone)] +pub(super) struct BlockId { + pub(super) number: NumberFor, + pub(super) hash: Block::Hash, +} + +impl From> for BlockId { + fn from(value: Info) -> Self { + BlockId { + number: value.best_number, + hash: value.best_hash, + } + } +} + +/// Store the given XDM ID as processed at given block. +pub fn set_xdm_message_processed_at( + backend: &Backend, + xdm_id: XdmId, + block_id: BlockId, +) -> ClientResult<()> +where + Backend: AuxStore, + Block: BlockT, +{ + let key = xdm_keys::get_key_for_xdm_id(xdm_id); + backend.insert_aux(&[(key.as_slice(), block_id.encode().as_slice())], vec![]) +} + +/// Returns the maybe last processed block number for given xdm. +pub fn get_xdm_processed_block_number( + backend: &Backend, + xdm_id: XdmId, +) -> ClientResult>> +where + Backend: AuxStore, + Block: BlockT, +{ + load_decode(backend, xdm_keys::get_key_for_xdm_id(xdm_id).as_slice()) +} diff --git a/domains/client/cross-domain-message-gossip/src/gossip_worker.rs b/domains/client/cross-domain-message-gossip/src/gossip_worker.rs index cc9d957ab7..8554f1d0eb 100644 --- a/domains/client/cross-domain-message-gossip/src/gossip_worker.rs +++ b/domains/client/cross-domain-message-gossip/src/gossip_worker.rs @@ -314,4 +314,8 @@ pub(crate) mod rep { /// Reputation change when a peer sends us a gossip message that can't be decoded. pub(crate) const GOSSIP_NOT_DECODABLE: ReputationChange = ReputationChange::new_fatal("Cross chain message: not decodable"); + + /// Reputation change when a peer sends us a non XDM message + pub(crate) const NOT_XDM: ReputationChange = + ReputationChange::new_fatal("Cross chain message: not XDM"); } diff --git a/domains/client/cross-domain-message-gossip/src/lib.rs b/domains/client/cross-domain-message-gossip/src/lib.rs index c88eb250f8..2f2feb7e07 100644 --- a/domains/client/cross-domain-message-gossip/src/lib.rs +++ b/domains/client/cross-domain-message-gossip/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(let_chains)] #![warn(rust_2018_idioms)] mod aux_schema; diff --git a/domains/client/cross-domain-message-gossip/src/message_listener.rs b/domains/client/cross-domain-message-gossip/src/message_listener.rs index 07f52ecb4b..063639150b 100644 --- a/domains/client/cross-domain-message-gossip/src/message_listener.rs +++ b/domains/client/cross-domain-message-gossip/src/message_listener.rs @@ -1,4 +1,7 @@ -use crate::aux_schema::{get_channel_state, set_channel_state}; +use crate::aux_schema::{ + get_channel_state, get_xdm_processed_block_number, set_channel_state, + set_xdm_message_processed_at, BlockId, +}; use crate::gossip_worker::{ChannelUpdate, MessageData}; use crate::{ChainMsg, ChannelDetail}; use domain_block_preprocessor::stateless_runtime::StatelessRuntime; @@ -18,17 +21,21 @@ use sp_core::{Hasher, H256}; use sp_domains::proof_provider_and_verifier::{StorageProofVerifier, VerificationError}; use sp_domains::{DomainId, DomainsApi, RuntimeType}; use sp_messenger::messages::{ChainId, Channel, ChannelId}; -use sp_messenger::RelayerApi; +use sp_messenger::{MessengerApi, RelayerApi}; use sp_runtime::codec::Decode; use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header, NumberFor}; +use sp_runtime::{SaturatedConversion, Saturating}; use std::collections::BTreeMap; use std::sync::Arc; use subspace_runtime_primitives::{Balance, BlockNumber}; use thiserror::Error; const LOG_TARGET: &str = "domain_message_listener"; +/// Number of blocks an already submitted XDM is not accepted since last submission. +const XDM_ACCEPT_BLOCK_LIMIT: u32 = 5; type BlockOf = ::Block; +type HeaderOf = <::Block as BlockT>::Header; type ExtrinsicOf = <::Block as BlockT>::Extrinsic; type HashingFor = <::Header as Header>::Hashing; @@ -88,7 +95,6 @@ pub async fn start_cross_chain_message_listener< TxnListener, CClient, CBlock, - Block, Executor, SO, >( @@ -102,11 +108,11 @@ pub async fn start_cross_chain_message_listener< sync_oracle: SO, ) where TxPool: TransactionPool + 'static, - Client: ProvideRuntimeApi> + HeaderBackend>, + Client: ProvideRuntimeApi> + HeaderBackend> + AuxStore, CBlock: BlockT, - Block: BlockT, + Client::Api: MessengerApi, NumberFor, CBlock::Hash>, CClient: ProvideRuntimeApi + HeaderBackend + AuxStore, - CClient::Api: DomainsApi + CClient::Api: DomainsApi> + RelayerApi, NumberFor, CBlock::Hash>, TxnListener: Stream + Unpin, Executor: CodeExecutor + RuntimeVersionOf, @@ -153,15 +159,25 @@ pub async fn start_cross_chain_message_listener< } }; - handle_xdm_message(&client, &tx_pool, chain_id, ext).await; + if let Ok(valid) = + handle_xdm_message::<_, _, CBlock>(&client, &tx_pool, chain_id, ext).await + && !valid + { + if let Some(peer_id) = msg.maybe_peer { + network.report_peer(peer_id, crate::gossip_worker::rep::NOT_XDM); + } + continue; + } + } + MessageData::ChannelUpdate(channel_update) => { + handle_channel_update::<_, _, _, BlockOf>( + chain_id, + channel_update, + &consensus_client, + domain_executor.clone(), + &mut domain_storage_key_cache, + ) } - MessageData::ChannelUpdate(channel_update) => handle_channel_update::<_, _, _, Block>( - chain_id, - channel_update, - &consensus_client, - domain_executor.clone(), - &mut domain_storage_key_cache, - ), } } } @@ -460,43 +476,98 @@ where Ok(()) } -async fn handle_xdm_message( +fn can_allow_xdm_submission( + client: &Arc, + submitted_block_id: BlockId, + current_block_id: BlockId, +) -> bool +where + Client: HeaderBackend, + Block: BlockT, +{ + match client.hash(submitted_block_id.number).ok().flatten() { + // there is no block at this number, allow xdm submission + None => return true, + Some(hash) => { + if hash != submitted_block_id.hash { + // client re-org'ed, allow xdm submission + return true; + } + } + } + + let latest_block_number = current_block_id.number; + let block_limit: NumberFor = XDM_ACCEPT_BLOCK_LIMIT.saturated_into(); + submitted_block_id.number < latest_block_number.saturating_sub(block_limit) +} + +async fn handle_xdm_message( client: &Arc, tx_pool: &Arc, chain_id: ChainId, ext: ExtrinsicOf, -) where +) -> Result +where TxPool: TransactionPool + 'static, - Client: HeaderBackend>, + CBlock: BlockT, + Client: ProvideRuntimeApi> + HeaderBackend> + AuxStore, + Client::Api: MessengerApi, NumberFor, CBlock::Hash>, { - let at = client.info().best_hash; - let tx_hash = tx_pool.hash_of(&ext); + let block_id: BlockId> = client.info().into(); + let runtime_api = client.runtime_api(); + let xdm_id = match runtime_api.xdm_id(block_id.hash, &ext)? { + // not a valid xdm, so return as invalid + None => return Ok(false), + Some(xdm_id) => xdm_id, + }; + + if let Some(submitted_block_id) = + get_xdm_processed_block_number::<_, BlockOf>(&**client, xdm_id)? + && !can_allow_xdm_submission(client, submitted_block_id.clone(), block_id.clone()) + { + tracing::debug!( + target: LOG_TARGET, + "Skipping XDM[{:?}] submission. At: {:?} and Now: {:?}", + xdm_id, + submitted_block_id, + block_id + ); + return Ok(true); + } + tracing::debug!( target: LOG_TARGET, - "Submitting extrinsic {:?} to tx pool for chain {:?} at block: {:?}", - tx_hash, + "Submitting XDM[{:?}] to tx pool for chain {:?} at block: {:?}", + xdm_id, chain_id, - at + block_id ); let tx_pool_res = tx_pool - .submit_one(at, TransactionSource::External, ext) + .submit_one(block_id.hash, TransactionSource::External, ext) .await; + let block_id: BlockId> = client.info().into(); if let Err(err) = tx_pool_res { tracing::error!( target: LOG_TARGET, - "Failed to submit extrinsic to tx pool for Chain {:?} with error: {:?}", + "Failed to submit XDM[{:?}] to tx pool for Chain {:?} with error: {:?} at block: {:?}", + xdm_id, chain_id, - err + err, + block_id ); } else { tracing::debug!( target: LOG_TARGET, - "Submitted extrinsic {:?} to tx pool for chain {:?} at {:?}", - tx_hash, + "Submitted XDM[{:?}] to tx pool for chain {:?} at {:?}", + xdm_id, chain_id, - at - ) + block_id + ); + + set_xdm_message_processed_at(&**client, xdm_id, block_id)?; } + + Ok(true) } diff --git a/domains/primitives/messenger/src/lib.rs b/domains/primitives/messenger/src/lib.rs index 67a2cf4282..f0733c738c 100644 --- a/domains/primitives/messenger/src/lib.rs +++ b/domains/primitives/messenger/src/lib.rs @@ -161,7 +161,7 @@ impl sp_inherents::InherentDataProvider for InherentDataProvider { } /// Represent a union of XDM types with their message ID -#[derive(Debug, Encode, Decode, TypeInfo)] +#[derive(Debug, Encode, Decode, TypeInfo, Copy, Clone)] pub enum XdmId { RelayMessage(MessageKey), RelayResponseMessage(MessageKey), diff --git a/domains/service/src/domain.rs b/domains/service/src/domain.rs index dea7b02b35..8ca2101b9c 100644 --- a/domains/service/src/domain.rs +++ b/domains/service/src/domain.rs @@ -581,25 +581,17 @@ where } // Start cross domain message listener for domain - let domain_listener = cross_domain_message_gossip::start_cross_chain_message_listener::< - _, - _, - _, - _, - _, - Block, - _, - _, - >( - ChainId::Domain(domain_id), - consensus_client.clone(), - client.clone(), - params.transaction_pool.clone(), - consensus_network, - domain_message_receiver, - code_executor.clone(), - domain_sync_oracle, - ); + let domain_listener = + cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>( + ChainId::Domain(domain_id), + consensus_client.clone(), + client.clone(), + params.transaction_pool.clone(), + consensus_network, + domain_message_receiver, + code_executor.clone(), + domain_sync_oracle, + ); spawn_essential.spawn_essential_blocking( "domain-message-listener", diff --git a/test/subspace-test-service/src/lib.rs b/test/subspace-test-service/src/lib.rs index 5ab623bc57..8e0efe006a 100644 --- a/test/subspace-test-service/src/lib.rs +++ b/test/subspace-test-service/src/lib.rs @@ -518,25 +518,17 @@ impl MockConsensusNode { tracing_unbounded("consensus_message_channel", 100); // Start cross domain message listener for Consensus chain to receive messages from domains in the network - let consensus_listener = cross_domain_message_gossip::start_cross_chain_message_listener::< - _, - _, - _, - _, - _, - DomainBlock, - _, - _, - >( - ChainId::Consensus, - client.clone(), - client.clone(), - transaction_pool.clone(), - network_service.clone(), - consensus_msg_receiver, - domain_executor, - sync_service.clone(), - ); + let consensus_listener = + cross_domain_message_gossip::start_cross_chain_message_listener::<_, _, _, _, _, _, _>( + ChainId::Consensus, + client.clone(), + client.clone(), + transaction_pool.clone(), + network_service.clone(), + consensus_msg_receiver, + domain_executor, + sync_service.clone(), + ); task_manager .spawn_essential_handle() From 1aa07f03c84181de9c33d4ac50e37d44bc351dab Mon Sep 17 00:00:00 2001 From: vedhavyas Date: Tue, 3 Dec 2024 12:35:57 +0530 Subject: [PATCH 3/7] add runtime api to get channel nonce --- crates/subspace-fake-runtime-api/src/lib.rs | 6 +++++- crates/subspace-runtime/src/lib.rs | 6 +++++- domains/pallets/messenger/src/lib.rs | 12 +++++++++++- domains/primitives/messenger/src/lib.rs | 15 ++++++++++++++- domains/runtime/auto-id/src/lib.rs | 6 +++++- domains/runtime/evm/src/lib.rs | 6 +++++- domains/test/runtime/auto-id/src/lib.rs | 6 +++++- domains/test/runtime/evm/src/lib.rs | 6 +++++- test/subspace-test-runtime/src/lib.rs | 6 +++++- 9 files changed, 60 insertions(+), 9 deletions(-) diff --git a/crates/subspace-fake-runtime-api/src/lib.rs b/crates/subspace-fake-runtime-api/src/lib.rs index 05539ce2fb..30a2b1bad9 100644 --- a/crates/subspace-fake-runtime-api/src/lib.rs +++ b/crates/subspace-fake-runtime-api/src/lib.rs @@ -32,7 +32,7 @@ use sp_domains_fraud_proof::storage_proof::FraudProofStorageKeyRequest; use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, ChannelId, CrossDomainMessage, MessageId, MessageKey, }; -use sp_messenger::XdmId; +use sp_messenger::{ChannelNonce, XdmId}; use sp_runtime::traits::{Block as BlockT, NumberFor}; use sp_runtime::transaction_validity::{TransactionSource, TransactionValidity}; use sp_runtime::{ApplyExtrinsicResult, ExtrinsicInclusionMode}; @@ -366,6 +366,10 @@ sp_api::impl_runtime_apis! { fn xdm_id(_ext: &::Extrinsic) -> Option { unreachable!() } + + fn channel_nonce(_chain_id: ChainId, _channel_id: ChannelId) -> Option { + unreachable!() + } } impl sp_messenger::RelayerApi::Hash> for Runtime { diff --git a/crates/subspace-runtime/src/lib.rs b/crates/subspace-runtime/src/lib.rs index 3654183d44..7ba9312dff 100644 --- a/crates/subspace-runtime/src/lib.rs +++ b/crates/subspace-runtime/src/lib.rs @@ -83,7 +83,7 @@ use sp_messenger::endpoint::{Endpoint, EndpointHandler as EndpointHandlerT, Endp use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; -use sp_messenger::XdmId; +use sp_messenger::{ChannelNonce, XdmId}; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::traits::{ @@ -1455,6 +1455,10 @@ impl_runtime_apis! { _ => None, } } + + fn channel_nonce(chain_id: ChainId, channel_id: ChannelId) -> Option { + Messenger::channel_nonce(chain_id, channel_id) + } } impl sp_messenger::RelayerApi::Hash> for Runtime { diff --git a/domains/pallets/messenger/src/lib.rs b/domains/pallets/messenger/src/lib.rs index 183109e87d..4fa397b3c1 100644 --- a/domains/pallets/messenger/src/lib.rs +++ b/domains/pallets/messenger/src/lib.rs @@ -139,7 +139,7 @@ mod pallet { MessageWeightTag, Payload, ProtocolMessageRequest, RequestResponse, VersionedPayload, }; use sp_messenger::{ - DomainRegistration, InherentError, InherentType, OnXDMRewards, StorageKeys, + ChannelNonce, DomainRegistration, InherentError, InherentType, OnXDMRewards, StorageKeys, INHERENT_IDENTIFIER, }; use sp_runtime::traits::Zero; @@ -1350,6 +1350,16 @@ mod pallet { UpdatedChannels::::get() } + pub fn channel_nonce(chain_id: ChainId, channel_id: ChannelId) -> Option { + Channels::::get(chain_id, channel_id).map(|channel| { + let last_inbox_nonce = channel.next_inbox_nonce.checked_sub(U256::one()); + ChannelNonce { + relay_msg_nonce: last_inbox_nonce, + relay_response_msg_nonce: channel.latest_response_received_message_nonce, + } + }) + } + pub fn pre_dispatch_with_trusted_mmr_proof( call: &Call, ) -> Result<(), TransactionValidityError> { diff --git a/domains/primitives/messenger/src/lib.rs b/domains/primitives/messenger/src/lib.rs index f0733c738c..c633921f49 100644 --- a/domains/primitives/messenger/src/lib.rs +++ b/domains/primitives/messenger/src/lib.rs @@ -23,7 +23,7 @@ pub mod messages; #[cfg(not(feature = "std"))] extern crate alloc; -use crate::messages::MessageKey; +use crate::messages::{MessageKey, Nonce}; #[cfg(not(feature = "std"))] use alloc::collections::BTreeSet; #[cfg(not(feature = "std"))] @@ -167,6 +167,16 @@ pub enum XdmId { RelayResponseMessage(MessageKey), } +#[derive(Debug, Encode, Decode, TypeInfo, Copy, Clone)] +pub struct ChannelNonce { + /// Last processed relay message nonce. + /// Could be nne if there is not relay message yet. + pub relay_msg_nonce: Option, + /// Last processed relay response message nonce. + /// Could be None since there is no first response yet + pub relay_response_msg_nonce: Option, +} + sp_api::decl_runtime_apis! { /// Api useful for relayers to fetch messages and submit transactions. pub trait RelayerApi @@ -231,5 +241,8 @@ sp_api::decl_runtime_apis! { /// Returns XDM message ID fn xdm_id(ext: &Block::Extrinsic) -> Option; + + /// Get Channel nonce for given chain and channel id. + fn channel_nonce(chain_id: ChainId, channel_id: ChannelId) -> Option; } } diff --git a/domains/runtime/auto-id/src/lib.rs b/domains/runtime/auto-id/src/lib.rs index b1c3abe560..ade1200f0c 100644 --- a/domains/runtime/auto-id/src/lib.rs +++ b/domains/runtime/auto-id/src/lib.rs @@ -45,7 +45,7 @@ use sp_messenger::endpoint::{Endpoint, EndpointHandler as EndpointHandlerT, Endp use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; -use sp_messenger::XdmId; +use sp_messenger::{ChannelNonce, XdmId}; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::generic::Era; @@ -922,6 +922,10 @@ impl_runtime_apis! { _ => None, } } + + fn channel_nonce(chain_id: ChainId, channel_id: ChannelId) -> Option { + Messenger::channel_nonce(chain_id, channel_id) + } } impl sp_messenger::RelayerApi for Runtime { diff --git a/domains/runtime/evm/src/lib.rs b/domains/runtime/evm/src/lib.rs index 91a4f9ec00..965573b918 100644 --- a/domains/runtime/evm/src/lib.rs +++ b/domains/runtime/evm/src/lib.rs @@ -58,7 +58,7 @@ use sp_messenger::endpoint::{Endpoint, EndpointHandler as EndpointHandlerT, Endp use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; -use sp_messenger::XdmId; +use sp_messenger::{ChannelNonce, XdmId}; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::generic::Era; @@ -1333,6 +1333,10 @@ impl_runtime_apis! { _ => None, } } + + fn channel_nonce(chain_id: ChainId, channel_id: ChannelId) -> Option { + Messenger::channel_nonce(chain_id, channel_id) + } } impl sp_messenger::RelayerApi for Runtime { diff --git a/domains/test/runtime/auto-id/src/lib.rs b/domains/test/runtime/auto-id/src/lib.rs index 59cb9c05d0..0f77507876 100644 --- a/domains/test/runtime/auto-id/src/lib.rs +++ b/domains/test/runtime/auto-id/src/lib.rs @@ -45,7 +45,7 @@ use sp_messenger::endpoint::{Endpoint, EndpointHandler as EndpointHandlerT, Endp use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; -use sp_messenger::XdmId; +use sp_messenger::{ChannelNonce, XdmId}; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::generic::Era; @@ -913,6 +913,10 @@ impl_runtime_apis! { _ => None, } } + + fn channel_nonce(chain_id: ChainId, channel_id: ChannelId) -> Option { + Messenger::channel_nonce(chain_id, channel_id) + } } impl sp_messenger::RelayerApi for Runtime { diff --git a/domains/test/runtime/evm/src/lib.rs b/domains/test/runtime/evm/src/lib.rs index 9bd7d22fee..26dce324a9 100644 --- a/domains/test/runtime/evm/src/lib.rs +++ b/domains/test/runtime/evm/src/lib.rs @@ -58,7 +58,7 @@ use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, ChannelId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; -use sp_messenger::XdmId; +use sp_messenger::{ChannelNonce, XdmId}; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::generic::Era; @@ -1289,6 +1289,10 @@ impl_runtime_apis! { _ => None, } } + + fn channel_nonce(chain_id: ChainId, channel_id: ChannelId) -> Option { + Messenger::channel_nonce(chain_id, channel_id) + } } impl sp_messenger::RelayerApi for Runtime { diff --git a/test/subspace-test-runtime/src/lib.rs b/test/subspace-test-runtime/src/lib.rs index 271590afae..383e0e9d17 100644 --- a/test/subspace-test-runtime/src/lib.rs +++ b/test/subspace-test-runtime/src/lib.rs @@ -73,7 +73,7 @@ use sp_messenger::messages::{ BlockMessagesWithStorageKey, ChainId, ChannelId, CrossDomainMessage, FeeModel, MessageId, MessageKey, }; -use sp_messenger::XdmId; +use sp_messenger::{ChannelNonce, XdmId}; use sp_messenger_host_functions::{get_storage_key, StorageKeyRequest}; use sp_mmr_primitives::EncodableOpaqueLeaf; use sp_runtime::traits::{ @@ -1516,6 +1516,10 @@ impl_runtime_apis! { _ => None, } } + + fn channel_nonce(chain_id: ChainId, channel_id: ChannelId) -> Option { + Messenger::channel_nonce(chain_id, channel_id) + } } impl sp_messenger::RelayerApi::Hash> for Runtime { From 0b892e2d0471477941bbf8e4cdfc71383dcc06a7 Mon Sep 17 00:00:00 2001 From: vedhavyas Date: Tue, 3 Dec 2024 13:11:30 +0530 Subject: [PATCH 4/7] use latest channel nonce to check for stale xdms --- .../src/message_listener.rs | 43 ++++++++++++++++++- domains/primitives/messenger/src/lib.rs | 9 ++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/domains/client/cross-domain-message-gossip/src/message_listener.rs b/domains/client/cross-domain-message-gossip/src/message_listener.rs index 063639150b..4aeac0136f 100644 --- a/domains/client/cross-domain-message-gossip/src/message_listener.rs +++ b/domains/client/cross-domain-message-gossip/src/message_listener.rs @@ -21,7 +21,7 @@ use sp_core::{Hasher, H256}; use sp_domains::proof_provider_and_verifier::{StorageProofVerifier, VerificationError}; use sp_domains::{DomainId, DomainsApi, RuntimeType}; use sp_messenger::messages::{ChainId, Channel, ChannelId}; -use sp_messenger::{MessengerApi, RelayerApi}; +use sp_messenger::{ChannelNonce, MessengerApi, RelayerApi, XdmId}; use sp_runtime::codec::Decode; use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header, NumberFor}; use sp_runtime::{SaturatedConversion, Saturating}; @@ -478,13 +478,43 @@ where fn can_allow_xdm_submission( client: &Arc, + xdm_id: XdmId, submitted_block_id: BlockId, current_block_id: BlockId, + maybe_channel_nonce: Option, ) -> bool where Client: HeaderBackend, Block: BlockT, { + if let Some(channel_nonce) = maybe_channel_nonce { + let maybe_nonces = match ( + xdm_id, + channel_nonce.relay_msg_nonce, + channel_nonce.relay_response_msg_nonce, + ) { + (XdmId::RelayMessage((_, _, nonce)), Some(channel_nonce), _) => { + Some((nonce, channel_nonce)) + } + (XdmId::RelayResponseMessage((_, _, nonce)), _, Some(channel_nonce)) => { + Some((nonce, channel_nonce)) + } + _ => None, + }; + + if let Some((xdm_nonce, channel_nonce)) = maybe_nonces + && (xdm_nonce <= channel_nonce) + { + tracing::debug!( + target: LOG_TARGET, + "Stale XDM submitted: XDM Nonce: {:?}, Channel Nonce: {:?}", + xdm_nonce, + channel_nonce + ); + return false; + } + } + match client.hash(submitted_block_id.number).ok().flatten() { // there is no block at this number, allow xdm submission None => return true, @@ -521,9 +551,18 @@ where Some(xdm_id) => xdm_id, }; + let (src_chain_id, channel_id) = xdm_id.get_chain_id_and_channel_id(); + let maybe_channel_nonce = runtime_api.channel_nonce(block_id.hash, src_chain_id, channel_id)?; + if let Some(submitted_block_id) = get_xdm_processed_block_number::<_, BlockOf>(&**client, xdm_id)? - && !can_allow_xdm_submission(client, submitted_block_id.clone(), block_id.clone()) + && !can_allow_xdm_submission( + client, + xdm_id, + submitted_block_id.clone(), + block_id.clone(), + maybe_channel_nonce, + ) { tracing::debug!( target: LOG_TARGET, diff --git a/domains/primitives/messenger/src/lib.rs b/domains/primitives/messenger/src/lib.rs index c633921f49..eca98fd8c5 100644 --- a/domains/primitives/messenger/src/lib.rs +++ b/domains/primitives/messenger/src/lib.rs @@ -167,6 +167,15 @@ pub enum XdmId { RelayResponseMessage(MessageKey), } +impl XdmId { + pub fn get_chain_id_and_channel_id(&self) -> (ChainId, ChannelId) { + match self { + XdmId::RelayMessage(key) => (key.0, key.1), + XdmId::RelayResponseMessage(key) => (key.0, key.1), + } + } +} + #[derive(Debug, Encode, Decode, TypeInfo, Copy, Clone)] pub struct ChannelNonce { /// Last processed relay message nonce. From d7a2029688d89e6d51fa850c75b903a3eadc0f54 Mon Sep 17 00:00:00 2001 From: vedhavyas Date: Tue, 3 Dec 2024 14:29:02 +0530 Subject: [PATCH 5/7] cleanup aux storage --- .../src/aux_schema.rs | 123 +++++++++++++++++- .../src/message_listener.rs | 10 +- 2 files changed, 127 insertions(+), 6 deletions(-) diff --git a/domains/client/cross-domain-message-gossip/src/aux_schema.rs b/domains/client/cross-domain-message-gossip/src/aux_schema.rs index 90cc993577..b2fa4ae0de 100644 --- a/domains/client/cross-domain-message-gossip/src/aux_schema.rs +++ b/domains/client/cross-domain-message-gossip/src/aux_schema.rs @@ -1,11 +1,12 @@ //! Schema for channel update storage. +use crate::message_listener::LOG_TARGET; use parity_scale_codec::{Decode, Encode}; use sc_client_api::backend::AuxStore; use sp_blockchain::{Error as ClientError, Info, Result as ClientResult}; use sp_core::H256; use sp_messenger::messages::{ChainId, ChannelId, ChannelState, Nonce}; -use sp_messenger::XdmId; +use sp_messenger::{ChannelNonce, XdmId}; use sp_runtime::traits::{Block as BlockT, NumberFor}; use subspace_runtime_primitives::BlockNumber; @@ -91,18 +92,50 @@ where mod xdm_keys { use parity_scale_codec::Encode; + use sp_domains::{ChainId, ChannelId}; + use sp_messenger::messages::MessageKey; use sp_messenger::XdmId; const XDM: &[u8] = b"xdm"; const XDM_RELAY: &[u8] = b"relay_msg"; const XDM_RELAY_RESPONSE: &[u8] = b"relay_msg_response"; + const XDM_LAST_CLEANUP_NONCE: &[u8] = b"xdm_last_cleanup_nonce"; pub(super) fn get_key_for_xdm_id(xdm_id: XdmId) -> Vec { match xdm_id { - XdmId::RelayMessage(id) => (XDM, XDM_RELAY, id).encode(), - XdmId::RelayResponseMessage(id) => (XDM, XDM_RELAY_RESPONSE, id).encode(), + XdmId::RelayMessage(id) => get_key_for_xdm_relay(id), + XdmId::RelayResponseMessage(id) => get_key_for_xdm_relay_response(id), } } + + pub(super) fn get_key_for_last_cleanup_relay_nonce( + chain_id: ChainId, + channel_id: ChannelId, + ) -> Vec { + (XDM, XDM_RELAY, XDM_LAST_CLEANUP_NONCE, chain_id, channel_id).encode() + } + + pub(super) fn get_key_for_last_cleanup_relay_response_nonce( + chain_id: ChainId, + channel_id: ChannelId, + ) -> Vec { + ( + XDM, + XDM_RELAY_RESPONSE, + XDM_LAST_CLEANUP_NONCE, + chain_id, + channel_id, + ) + .encode() + } + + pub(super) fn get_key_for_xdm_relay(id: MessageKey) -> Vec { + (XDM, XDM_RELAY, id).encode() + } + + pub(super) fn get_key_for_xdm_relay_response(id: MessageKey) -> Vec { + (XDM, XDM_RELAY_RESPONSE, id).encode() + } } #[derive(Debug, Encode, Decode, Clone)] @@ -145,3 +178,87 @@ where { load_decode(backend, xdm_keys::get_key_for_xdm_id(xdm_id).as_slice()) } + +/// Cleans up all the xdm storages until the latest nonces. +pub fn cleanup_chain_channel_storages( + backend: &Backend, + chain_id: ChainId, + channel_id: ChannelId, + channel_nonce: ChannelNonce, +) -> ClientResult<()> +where + Backend: AuxStore, +{ + let mut to_insert = vec![]; + let mut to_delete = vec![]; + if let Some(latest_relay_nonce) = channel_nonce.relay_msg_nonce { + let last_cleanup_relay_nonce_key = + xdm_keys::get_key_for_last_cleanup_relay_nonce(chain_id, channel_id); + let last_cleaned_up_nonce = + load_decode::<_, Nonce>(backend, last_cleanup_relay_nonce_key.as_slice())?; + + let mut from_nonce = match last_cleaned_up_nonce { + None => Nonce::zero(), + Some(last_nonce) => last_nonce.saturating_add(Nonce::one()), + }; + + tracing::debug!( + target: LOG_TARGET, + "Cleaning Relay xdm keys for {:?} channel: {:?} from: {:?} to: {:?}", + chain_id, + channel_id, + from_nonce, + latest_relay_nonce + ); + + while from_nonce <= latest_relay_nonce { + to_delete.push(xdm_keys::get_key_for_xdm_relay(( + chain_id, channel_id, from_nonce, + ))); + from_nonce = from_nonce.saturating_add(Nonce::one()); + } + + to_insert.push((last_cleanup_relay_nonce_key, latest_relay_nonce.encode())); + } + + if let Some(latest_relay_response_nonce) = channel_nonce.relay_response_msg_nonce { + let last_cleanup_relay_response_nonce_key = + xdm_keys::get_key_for_last_cleanup_relay_response_nonce(chain_id, channel_id); + let last_cleaned_up_nonce = + load_decode::<_, Nonce>(backend, last_cleanup_relay_response_nonce_key.as_slice())?; + + let mut from_nonce = match last_cleaned_up_nonce { + None => Nonce::zero(), + Some(last_nonce) => last_nonce.saturating_add(Nonce::one()), + }; + + tracing::debug!( + target: LOG_TARGET, + "Cleaning Relay response xdm keys for {:?} channel: {:?} from: {:?} to: {:?}", + chain_id, + channel_id, + from_nonce, + latest_relay_response_nonce + ); + + while from_nonce <= latest_relay_response_nonce { + to_delete.push(xdm_keys::get_key_for_xdm_relay_response(( + chain_id, channel_id, from_nonce, + ))); + from_nonce = from_nonce.saturating_add(Nonce::one()); + } + + to_insert.push(( + last_cleanup_relay_response_nonce_key, + latest_relay_response_nonce.encode(), + )); + } + + backend.insert_aux( + &to_insert + .iter() + .map(|(k, v)| (k.as_slice(), v.as_slice())) + .collect::>(), + &to_delete.iter().map(|k| k.as_slice()).collect::>(), + ) +} diff --git a/domains/client/cross-domain-message-gossip/src/message_listener.rs b/domains/client/cross-domain-message-gossip/src/message_listener.rs index 4aeac0136f..bc4e1d41b8 100644 --- a/domains/client/cross-domain-message-gossip/src/message_listener.rs +++ b/domains/client/cross-domain-message-gossip/src/message_listener.rs @@ -1,6 +1,6 @@ use crate::aux_schema::{ - get_channel_state, get_xdm_processed_block_number, set_channel_state, - set_xdm_message_processed_at, BlockId, + cleanup_chain_channel_storages, get_channel_state, get_xdm_processed_block_number, + set_channel_state, set_xdm_message_processed_at, BlockId, }; use crate::gossip_worker::{ChannelUpdate, MessageData}; use crate::{ChainMsg, ChannelDetail}; @@ -30,7 +30,7 @@ use std::sync::Arc; use subspace_runtime_primitives::{Balance, BlockNumber}; use thiserror::Error; -const LOG_TARGET: &str = "domain_message_listener"; +pub(crate) const LOG_TARGET: &str = "domain_message_listener"; /// Number of blocks an already submitted XDM is not accepted since last submission. const XDM_ACCEPT_BLOCK_LIMIT: u32 = 5; @@ -608,5 +608,9 @@ where set_xdm_message_processed_at(&**client, xdm_id, block_id)?; } + if let Some(channel_nonce) = maybe_channel_nonce { + cleanup_chain_channel_storages(&**client, src_chain_id, channel_id, channel_nonce)?; + } + Ok(true) } From 40b4342c1ea8bacc437ada3b8dd15e2b4e20d62b Mon Sep 17 00:00:00 2001 From: vedhavyas Date: Tue, 3 Dec 2024 14:46:08 +0530 Subject: [PATCH 6/7] version the messenger_api and ensure the runtime apis are avaible for xdm submission checks --- .../src/message_listener.rs | 144 +++++++++++------- domains/primitives/messenger/src/lib.rs | 1 + 2 files changed, 90 insertions(+), 55 deletions(-) diff --git a/domains/client/cross-domain-message-gossip/src/message_listener.rs b/domains/client/cross-domain-message-gossip/src/message_listener.rs index bc4e1d41b8..2243990b25 100644 --- a/domains/client/cross-domain-message-gossip/src/message_listener.rs +++ b/domains/client/cross-domain-message-gossip/src/message_listener.rs @@ -11,7 +11,7 @@ use sc_client_api::AuxStore; use sc_executor::RuntimeVersionOf; use sc_network::NetworkPeers; use sc_transaction_pool_api::{TransactionPool, TransactionSource}; -use sp_api::{ApiError, ProvideRuntimeApi, StorageProof}; +use sp_api::{ApiError, ApiExt, ProvideRuntimeApi, StorageProof}; use sp_blockchain::HeaderBackend; use sp_consensus::SyncOracle; use sp_core::crypto::AccountId32; @@ -545,72 +545,106 @@ where { let block_id: BlockId> = client.info().into(); let runtime_api = client.runtime_api(); - let xdm_id = match runtime_api.xdm_id(block_id.hash, &ext)? { - // not a valid xdm, so return as invalid - None => return Ok(false), - Some(xdm_id) => xdm_id, - }; + let api_version = runtime_api + .api_version::, NumberFor, CBlock::Hash>>( + block_id.hash, + )? + .unwrap_or(1); + + let api_available = api_version >= 2; + if api_available { + let xdm_id = match runtime_api.xdm_id(block_id.hash, &ext)? { + // not a valid xdm, so return as invalid + None => return Ok(false), + Some(xdm_id) => xdm_id, + }; - let (src_chain_id, channel_id) = xdm_id.get_chain_id_and_channel_id(); - let maybe_channel_nonce = runtime_api.channel_nonce(block_id.hash, src_chain_id, channel_id)?; + let (src_chain_id, channel_id) = xdm_id.get_chain_id_and_channel_id(); + let maybe_channel_nonce = + runtime_api.channel_nonce(block_id.hash, src_chain_id, channel_id)?; + + if let Some(submitted_block_id) = + get_xdm_processed_block_number::<_, BlockOf>(&**client, xdm_id)? + && !can_allow_xdm_submission( + client, + xdm_id, + submitted_block_id.clone(), + block_id.clone(), + maybe_channel_nonce, + ) + { + tracing::debug!( + target: LOG_TARGET, + "Skipping XDM[{:?}] submission. At: {:?} and Now: {:?}", + xdm_id, + submitted_block_id, + block_id + ); + return Ok(true); + } - if let Some(submitted_block_id) = - get_xdm_processed_block_number::<_, BlockOf>(&**client, xdm_id)? - && !can_allow_xdm_submission( - client, - xdm_id, - submitted_block_id.clone(), - block_id.clone(), - maybe_channel_nonce, - ) - { tracing::debug!( target: LOG_TARGET, - "Skipping XDM[{:?}] submission. At: {:?} and Now: {:?}", + "Submitting XDM[{:?}] to tx pool for chain {:?} at block: {:?}", xdm_id, - submitted_block_id, + chain_id, block_id ); - return Ok(true); - } - tracing::debug!( - target: LOG_TARGET, - "Submitting XDM[{:?}] to tx pool for chain {:?} at block: {:?}", - xdm_id, - chain_id, - block_id - ); + let tx_pool_res = tx_pool + .submit_one(block_id.hash, TransactionSource::External, ext) + .await; - let tx_pool_res = tx_pool - .submit_one(block_id.hash, TransactionSource::External, ext) - .await; + let block_id: BlockId> = client.info().into(); + if let Err(err) = tx_pool_res { + tracing::error!( + target: LOG_TARGET, + "Failed to submit XDM[{:?}] to tx pool for Chain {:?} with error: {:?} at block: {:?}", + xdm_id, + chain_id, + err, + block_id + ); + } else { + tracing::debug!( + target: LOG_TARGET, + "Submitted XDM[{:?}] to tx pool for chain {:?} at {:?}", + xdm_id, + chain_id, + block_id + ); - let block_id: BlockId> = client.info().into(); - if let Err(err) = tx_pool_res { - tracing::error!( - target: LOG_TARGET, - "Failed to submit XDM[{:?}] to tx pool for Chain {:?} with error: {:?} at block: {:?}", - xdm_id, - chain_id, - err, - block_id - ); + set_xdm_message_processed_at(&**client, xdm_id, block_id)?; + } + + if let Some(channel_nonce) = maybe_channel_nonce { + cleanup_chain_channel_storages(&**client, src_chain_id, channel_id, channel_nonce)?; + } + + Ok(true) } else { - tracing::debug!( - target: LOG_TARGET, - "Submitted XDM[{:?}] to tx pool for chain {:?} at {:?}", - xdm_id, - chain_id, - block_id - ); + let tx_pool_res = tx_pool + .submit_one(block_id.hash, TransactionSource::External, ext) + .await; - set_xdm_message_processed_at(&**client, xdm_id, block_id)?; - } + let block_id: BlockId> = client.info().into(); + if let Err(err) = tx_pool_res { + tracing::error!( + target: LOG_TARGET, + "Failed to submit XDM to tx pool for Chain {:?} with error: {:?} at block: {:?}", + chain_id, + err, + block_id + ); + } else { + tracing::debug!( + target: LOG_TARGET, + "Submitted XDM to tx pool for chain {:?} at {:?}", + chain_id, + block_id + ); + } - if let Some(channel_nonce) = maybe_channel_nonce { - cleanup_chain_channel_storages(&**client, src_chain_id, channel_id, channel_nonce)?; + Ok(true) } - - Ok(true) } diff --git a/domains/primitives/messenger/src/lib.rs b/domains/primitives/messenger/src/lib.rs index eca98fd8c5..d73946908c 100644 --- a/domains/primitives/messenger/src/lib.rs +++ b/domains/primitives/messenger/src/lib.rs @@ -222,6 +222,7 @@ sp_api::decl_runtime_apis! { } /// Api to provide XDM extraction from Runtime Calls. + #[api_version(2)] pub trait MessengerApi where CNumber: Encode + Decode, From 16a23f807b7cb12a6dcd5f3547459dd42e9da529 Mon Sep 17 00:00:00 2001 From: vedhavyas Date: Wed, 4 Dec 2024 17:53:10 +0530 Subject: [PATCH 7/7] fix typos and grammer in the comments --- domains/primitives/messenger/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/domains/primitives/messenger/src/lib.rs b/domains/primitives/messenger/src/lib.rs index d73946908c..aab7688add 100644 --- a/domains/primitives/messenger/src/lib.rs +++ b/domains/primitives/messenger/src/lib.rs @@ -179,10 +179,10 @@ impl XdmId { #[derive(Debug, Encode, Decode, TypeInfo, Copy, Clone)] pub struct ChannelNonce { /// Last processed relay message nonce. - /// Could be nne if there is not relay message yet. + /// Could be None if there is no relay message yet. pub relay_msg_nonce: Option, /// Last processed relay response message nonce. - /// Could be None since there is no first response yet + /// Could be None if there is no first response yet pub relay_response_msg_nonce: Option, }