Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Make offchain tx pool creation reusable #14230

Merged
merged 3 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 12 additions & 43 deletions client/api/src/execution_extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,16 @@
//! strategy for the runtime calls and provide the right `Externalities`
//! extensions to support APIs for particular execution context & capabilities.

use codec::Decode;
use parking_lot::RwLock;
use sc_transaction_pool_api::OffchainSubmitTransaction;
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sp_core::{
offchain::{self, OffchainDbExt, OffchainWorkerExt, TransactionPoolExt},
offchain::{self, OffchainDbExt, OffchainWorkerExt},
traits::{ReadRuntimeVersion, ReadRuntimeVersionExt},
ExecutionContext,
};
use sp_externalities::{Extension, Extensions};
use sp_keystore::{KeystoreExt, KeystorePtr};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, NumberFor},
};
use sp_runtime::traits::{Block as BlockT, NumberFor};
pub use sp_state_machine::ExecutionStrategy;
use sp_state_machine::{DefaultHandler, ExecutionManager};
use std::{
Expand Down Expand Up @@ -168,11 +164,7 @@ pub struct ExecutionExtensions<Block: BlockT> {
offchain_db: Option<Box<dyn DbExternalitiesFactory>>,
// FIXME: these three are only RwLock because of https://github.com/paritytech/substrate/issues/4587
// remove when fixed.
// To break retain cycle between `Client` and `TransactionPool` we require this
// extension to be a `Weak` reference.
// That's also the reason why it's being registered lazily instead of
// during initialization.
transaction_pool: RwLock<Option<Weak<dyn OffchainSubmitTransaction<Block>>>>,
transaction_pool_factory: RwLock<Option<OffchainTransactionPoolFactory<Block>>>,
extensions_factory: RwLock<Box<dyn ExtensionsFactory<Block>>>,
statement_store: RwLock<Option<Weak<dyn sp_statement_store::StatementStore>>>,
read_runtime_version: Arc<dyn ReadRuntimeVersion>,
Expand All @@ -194,7 +186,7 @@ impl<Block: BlockT> ExecutionExtensions<Block> {
keystore,
offchain_db,
extensions_factory: RwLock::new(extensions_factory),
transaction_pool,
transaction_pool_factory: transaction_pool,
statement_store,
read_runtime_version,
}
Expand All @@ -211,11 +203,11 @@ impl<Block: BlockT> ExecutionExtensions<Block> {
}

/// Register transaction pool extension.
pub fn register_transaction_pool<T>(&self, pool: &Arc<T>)
where
T: OffchainSubmitTransaction<Block> + 'static,
{
*self.transaction_pool.write() = Some(Arc::downgrade(pool) as _);
pub fn register_transaction_pool_factory(
&self,
factory: OffchainTransactionPoolFactory<Block>,
) {
*self.transaction_pool_factory.write() = Some(factory);
}

/// Register statement store extension.
Expand Down Expand Up @@ -245,11 +237,8 @@ impl<Block: BlockT> ExecutionExtensions<Block> {
}

if capabilities.contains(offchain::Capabilities::TRANSACTION_POOL) {
if let Some(pool) = self.transaction_pool.read().as_ref().and_then(|x| x.upgrade()) {
extensions.register(TransactionPoolExt(Box::new(TransactionPoolAdapter {
at: BlockId::Hash(block_hash),
pool,
}) as _));
if let Some(pool) = self.transaction_pool_factory.read().as_ref() {
extensions.register(pool.offchain_transaction_pool(block_hash));
}
}

Expand Down Expand Up @@ -303,23 +292,3 @@ impl<Block: BlockT> ExecutionExtensions<Block> {
(manager, self.extensions(block_hash, block_number, context))
}
}

/// A wrapper type to pass `BlockId` to the actual transaction pool.
struct TransactionPoolAdapter<Block: BlockT> {
at: BlockId<Block>,
pool: Arc<dyn OffchainSubmitTransaction<Block>>,
}

impl<Block: BlockT> offchain::TransactionPool for TransactionPoolAdapter<Block> {
fn submit_transaction(&mut self, data: Vec<u8>) -> Result<(), ()> {
let xt = match Block::Extrinsic::decode(&mut &*data) {
Ok(xt) => xt,
Err(e) => {
log::warn!("Unable to decode extrinsic: {:?}: {}", data, e);
return Err(())
},
};

self.pool.submit_at(&self.at, xt)
}
}
35 changes: 7 additions & 28 deletions client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,15 @@ mod tests {
use sc_block_builder::BlockBuilderProvider as _;
use sc_client_api::Backend as _;
use sc_network::{config::MultiaddrWithPeerId, types::ProtocolName, ReputationChange};
use sc_transaction_pool::{BasicPool, FullChainApi};
use sc_transaction_pool::BasicPool;
use sc_transaction_pool_api::{InPoolTransaction, TransactionPool};
use sp_consensus::BlockOrigin;
use sp_runtime::generic::BlockId;
use std::{collections::HashSet, sync::Arc};
use substrate_test_runtime_client::{
runtime::{
substrate_test_pallet::pallet::Call as PalletCall, Block, ExtrinsicBuilder, RuntimeCall,
substrate_test_pallet::pallet::Call as PalletCall, ExtrinsicBuilder, RuntimeCall,
},
ClientBlockImportExt, DefaultTestClientBuilderExt, TestClient, TestClientBuilderExt,
ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilderExt,
};

struct TestNetwork();
Expand Down Expand Up @@ -337,34 +336,14 @@ mod tests {
}
}

struct TestPool(Arc<BasicPool<FullChainApi<TestClient, Block>, Block>>);

impl sc_transaction_pool_api::OffchainSubmitTransaction<Block> for TestPool {
fn submit_at(
&self,
at: &BlockId<Block>,
extrinsic: <Block as traits::Block>::Extrinsic,
) -> Result<(), ()> {
let source = sc_transaction_pool_api::TransactionSource::Local;
futures::executor::block_on(self.0.submit_one(&at, source, extrinsic))
.map(|_| ())
.map_err(|_| ())
}
}

#[test]
fn should_call_into_runtime_and_produce_extrinsic() {
sp_tracing::try_init_simple();

let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
let pool = TestPool(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner,
client.clone(),
));
let pool =
BasicPool::new_full(Default::default(), true.into(), None, spawner, client.clone());
let network = Arc::new(TestNetwork());
let header = client.header(client.chain_info().genesis_hash).unwrap().unwrap();

Expand All @@ -373,9 +352,9 @@ mod tests {
futures::executor::block_on(offchain.on_block_imported(&header, network, false));

// then
assert_eq!(pool.0.status().ready, 1);
assert_eq!(pool.status().ready, 1);
assert!(matches!(
pool.0.ready().next().unwrap().data().function,
pool.ready().next().unwrap().data().function,
RuntimeCall::SubstrateTest(PalletCall::storage_change { .. })
));
}
Expand Down
2 changes: 2 additions & 0 deletions client/transaction-pool/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ description = "Transaction pool client facing API."

[dependencies]
async-trait = "0.1.57"
codec = { package = "parity-scale-codec", version = "3.2.2" }
futures = "0.3.21"
log = "0.4.17"
serde = { version = "1.0.136", features = ["derive"] }
thiserror = "1.0.30"
sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" }
sp-core = { version = "8.0.0", default-features = false, path = "../../../primitives/core" }
sp-runtime = { version = "8.0.0", default-features = false, path = "../../../primitives/runtime" }

[dev-dependencies]
Expand Down
76 changes: 65 additions & 11 deletions client/transaction-pool/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ pub mod error;
use async_trait::async_trait;
use futures::{Future, Stream};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sp_core::offchain::TransactionPoolExt;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Member, NumberFor},
};
use std::{collections::HashMap, hash::Hash, pin::Pin, sync::Arc};
use std::{
collections::HashMap,
hash::Hash,
pin::Pin,
sync::{Arc, Weak},
};

const LOG_TARGET: &str = "txpool::api";

Expand Down Expand Up @@ -329,29 +335,27 @@ pub trait LocalTransactionPool: Send + Sync {
/// `TransactionSource::Local`.
fn submit_local(
&self,
at: &BlockId<Self::Block>,
at: <Self::Block as BlockT>::Hash,
xt: LocalTransactionFor<Self>,
) -> Result<Self::Hash, Self::Error>;
}

/// An abstraction for transaction pool.
/// An abstraction for [`LocalTransactionPool`]
///
/// This trait is used by offchain calls to be able to submit transactions.
/// The main use case is for offchain workers, to feed back the results of computations,
/// but since the transaction pool access is a separate `ExternalitiesExtension` it can
/// be also used in context of other offchain calls. For one may generate and submit
/// a transaction for some misbehavior reports (say equivocation).
pub trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
/// We want to use a transaction pool in [`OffchainTransactionPoolFactory`] in a `Arc` without
/// bleeding the associated types besides the `Block`. Thus, this abstraction here exists to achieve
/// the wrapping in a `Arc`.
trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
/// Submit transaction.
///
/// The transaction will end up in the pool and be propagated to others.
fn submit_at(&self, at: &BlockId<Block>, extrinsic: Block::Extrinsic) -> Result<(), ()>;
fn submit_at(&self, at: Block::Hash, extrinsic: Block::Extrinsic) -> Result<(), ()>;
}

impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
fn submit_at(
&self,
at: &BlockId<TPool::Block>,
at: <TPool::Block as BlockT>::Hash,
extrinsic: <TPool::Block as BlockT>::Extrinsic,
) -> Result<(), ()> {
log::debug!(
Expand All @@ -372,6 +376,56 @@ impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TP
}
}

/// Factory for creating [`TransactionPoolExt`]s.
///
/// This provides an easy way for creating [`TransactionPoolExt`] extensions for registering them in
/// the wasm execution environment to send transactions from an offchain call to the runtime.
#[derive(Clone)]
pub struct OffchainTransactionPoolFactory<Block: BlockT> {
// To break retain cycle between `Client` and `TransactionPool` we require this
// extension to be a `Weak` reference.
pool: Weak<dyn OffchainSubmitTransaction<Block>>,
}

impl<Block: BlockT> OffchainTransactionPoolFactory<Block> {
/// Creates a new instance using the given `tx_pool`.
pub fn new<T: LocalTransactionPool<Block = Block> + 'static>(tx_pool: &Arc<T>) -> Self {
Self { pool: Arc::downgrade(tx_pool) as Weak<_> }
}

/// Returns an instance of [`TransactionPoolExt`] bound to the given `block_hash`.
///
/// Transactions that are being submitted by this instance will be submitted with `block_hash`
/// as context for validation.
pub fn offchain_transaction_pool(&self, block_hash: Block::Hash) -> TransactionPoolExt {
TransactionPoolExt::new(OffchainTransactionPool { pool: self.pool.clone(), block_hash })
}
}

/// Wraps a `pool` and `block_hash` to implement [`sp_core::offchain::TransactionPool`].
struct OffchainTransactionPool<Block: BlockT> {
block_hash: Block::Hash,
pool: Weak<dyn OffchainSubmitTransaction<Block>>,
}

impl<Block: BlockT> sp_core::offchain::TransactionPool for OffchainTransactionPool<Block> {
fn submit_transaction(&mut self, extrinsic: Vec<u8>) -> Result<(), ()> {
let extrinsic = match codec::Decode::decode(&mut &extrinsic[..]) {
Ok(t) => t,
Err(e) => {
log::error!(
target: LOG_TARGET,
"Failed to decode extrinsic in `OffchainTransactionPool::submit_transaction`: {e:?}"
);

return Err(())
},
};

self.pool.upgrade().ok_or(())?.submit_at(self.block_hash, extrinsic)
}
}

/// Wrapper functions to keep the API backwards compatible over the wire for the old RPC spec.
mod v1_compatible {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
Expand Down
18 changes: 12 additions & 6 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ use std::{
use graph::{ExtrinsicHash, IsValidator};
use sc_transaction_pool_api::{
error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool,
PoolFuture, PoolStatus, ReadyTransactions, TransactionFor, TransactionPool, TransactionSource,
TransactionStatusStreamFor, TxHash,
OffchainTransactionPoolFactory, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor,
TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
};
use sp_core::traits::SpawnEssentialNamed;
use sp_runtime::{
Expand Down Expand Up @@ -397,7 +397,9 @@ where
));

// make transaction pool available for off-chain runtime calls.
client.execution_extensions().register_transaction_pool(&pool);
client
.execution_extensions()
.register_transaction_pool_factory(OffchainTransactionPoolFactory::new(&pool));

pool
}
Expand All @@ -421,7 +423,7 @@ where

fn submit_local(
&self,
at: &BlockId<Self::Block>,
at: Block::Hash,
xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
) -> Result<Self::Hash, Self::Error> {
use sp_runtime::{
Expand All @@ -430,7 +432,11 @@ where

let validity = self
.api
.validate_transaction_blocking(at, TransactionSource::Local, xt.clone())?
.validate_transaction_blocking(
&BlockId::hash(at),
TransactionSource::Local,
xt.clone(),
)?
.map_err(|e| {
Self::Error::Pool(match e {
TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
Expand All @@ -441,7 +447,7 @@ where
let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
let block_number = self
.api
.block_id_to_number(at)?
.block_id_to_number(&BlockId::hash(at))?
.ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;

let validated = ValidatedTransaction::valid_at(
Expand Down