Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use transaction graph interface for future transactions #435

Merged
merged 2 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion client/rpc/txpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ sp-io = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0
sp-std = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
sp-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
sc-transaction-graph = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
frame-system = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
serde = { version = "1.0", features = ["derive"] }

moonbeam-rpc-primitives-txpool = { path = "../../../primitives/rpc/txpool" }
fc-rpc = { git = "https://github.com/purestake/frontier", branch = "moonbeam-v0.8" }
fc-rpc = { git = "https://github.com/purestake/frontier", branch = "moonbeam-v0.8" }
90 changes: 55 additions & 35 deletions client/rpc/txpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,105 +21,125 @@ use jsonrpc_core::Result as RpcResult;
pub use moonbeam_rpc_core_txpool::{
GetT, Summary, Transaction, TransactionMap, TxPool as TxPoolT, TxPoolResult, TxPoolServer,
};
use sc_transaction_graph::{ChainApi, Pool};
use serde::Serialize;
use sha3::{Digest, Keccak256};
use sp_api::{BlockId, ProvideRuntimeApi};
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_runtime::traits::Block as BlockT;
use sp_transaction_pool::{InPoolTransaction, TransactionPool};
use sp_transaction_pool::InPoolTransaction;
use std::collections::HashMap;
use std::{marker::PhantomData, sync::Arc};

use moonbeam_rpc_primitives_txpool::TxPoolRuntimeApi;
use moonbeam_rpc_primitives_txpool::{TxPoolResponse, TxPoolRuntimeApi};

pub struct TxPool<B: BlockT, C, P> {
pub struct TxPool<B: BlockT, C, A: ChainApi> {
client: Arc<C>,
pool: Arc<P>,
graph: Arc<Pool<A>>,
_marker: PhantomData<B>,
}

impl<B, C, P> TxPool<B, C, P>
impl<B, C, A> TxPool<B, C, A>
where
C: ProvideRuntimeApi<B>,
C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B> + 'static,
C: Send + Sync + 'static,
B: BlockT<Hash = H256> + Send + Sync + 'static,
P: TransactionPool<Block = B> + Send + Sync + 'static,
A: ChainApi<Block = B> + 'static,
C::Api: TxPoolRuntimeApi<B>,
{
fn map_build<T>(&self) -> RpcResult<TransactionMap<T>>
/// Use the transaction graph interface to get the extrinsics currently in the ready and future
/// queues.
fn map_build<T>(&self) -> RpcResult<TxPoolResult<TransactionMap<T>>>
where
T: GetT,
T: GetT + Serialize,
{
let txs: Vec<_> = self
.pool
// Collect transactions in the ready validated pool.
let txs_ready = self
.graph
.validated_pool()
.ready()
.map(|in_pool_tx| in_pool_tx.data().clone())
.collect();

// Collect transactions in the future validated pool.
let txs_future = self
.graph
.validated_pool()
.futures()
.iter()
.map(|(_hash, extrinsic)| extrinsic.clone())
.collect();

// Use the runtime to match the (here) opaque extrinsics against ethereum transactions.
let best_block: BlockId<B> = BlockId::Hash(self.client.info().best_hash);
let ethereum_txns = self
let ethereum_txns: TxPoolResponse = self
.client
.runtime_api()
.extrinsic_filter(&best_block, txs)
.extrinsic_filter(&best_block, txs_ready, txs_future)
.map_err(|err| {
internal_err(format!("fetch runtime extrinsic filter failed: {:?}", err))
})?;
let mut out = TransactionMap::<T>::new();
for txn in ethereum_txns.iter() {
// Build the T response.
let mut pending = TransactionMap::<T>::new();
for txn in ethereum_txns.ready.iter() {
let transaction_message = TransactionMessage::from(txn.clone());
let hash = transaction_message.hash();
let from_address = match public_key(txn) {
Ok(pk) => H160::from(H256::from_slice(Keccak256::digest(&pk).as_slice())),
Err(_e) => H160::default(),
};
out.entry(from_address)
pending
.entry(from_address)
.or_insert_with(HashMap::new)
.insert(txn.nonce, T::get(hash, from_address, txn));
}
Ok(out)
let mut queued = TransactionMap::<T>::new();
for txn in ethereum_txns.future.iter() {
let transaction_message = TransactionMessage::from(txn.clone());
let hash = transaction_message.hash();
let from_address = match public_key(txn) {
Ok(pk) => H160::from(H256::from_slice(Keccak256::digest(&pk).as_slice())),
Err(_e) => H160::default(),
};
queued
.entry(from_address)
.or_insert_with(HashMap::new)
.insert(txn.nonce, T::get(hash, from_address, txn));
}
Ok(TxPoolResult { pending, queued })
}
}

impl<B: BlockT, C, P> TxPool<B, C, P> {
pub fn new(client: Arc<C>, pool: Arc<P>) -> Self {
impl<B: BlockT, C, A: ChainApi> TxPool<B, C, A> {
pub fn new(client: Arc<C>, graph: Arc<Pool<A>>) -> Self {
Self {
client,
pool,
graph,
_marker: PhantomData,
}
}
}

impl<B, C, P> TxPoolT for TxPool<B, C, P>
impl<B, C, A> TxPoolT for TxPool<B, C, A>
where
C: ProvideRuntimeApi<B>,
C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
C: Send + Sync + 'static,
B: BlockT<Hash = H256> + Send + Sync + 'static,
P: TransactionPool<Block = B> + Send + Sync + 'static,
A: ChainApi<Block = B> + 'static,
C::Api: TxPoolRuntimeApi<B>,
{
fn content(&self) -> RpcResult<TxPoolResult<TransactionMap<Transaction>>> {
let pending = self.map_build::<Transaction>()?;
Ok(TxPoolResult {
pending,
// Future queue not yet supported. We need to do something like:
// - Use InpoolTransaction::requires() to get the TransactionTag bytes.
// - Somehow decode and identify the tag to either add it to the future or pending pool.
queued: HashMap::new(),
})
self.map_build::<Transaction>()
}

fn inspect(&self) -> RpcResult<TxPoolResult<TransactionMap<Summary>>> {
let pending = self.map_build::<Summary>()?;
Ok(TxPoolResult {
pending,
queued: HashMap::new(),
})
self.map_build::<Summary>()
}

fn status(&self) -> RpcResult<TxPoolResult<U256>> {
let status = self.pool.status();
let status = self.graph.validated_pool().status();
Ok(TxPoolResult {
pending: U256::from(status.ready),
queued: U256::from(status.future),
Expand Down
1 change: 1 addition & 0 deletions node/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ sc-executor = { git = "https://github.com/paritytech/substrate", branch = "polka
sc-service = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
sc-transaction-graph = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
sp-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
sc-basic-authorship = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2" }
Expand Down
2 changes: 2 additions & 0 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ where
let deps = rpc::FullDeps {
client: client.clone(),
pool: pool.clone(),
graph: pool.pool().clone(),
deny_unsafe,
is_authority: collator,
network: network.clone(),
Expand Down Expand Up @@ -936,6 +937,7 @@ pub fn new_dev(
let deps = rpc::FullDeps {
client: client.clone(),
pool: pool.clone(),
graph: pool.pool().clone(),
deny_unsafe,
is_authority: collator,
network: network.clone(),
Expand Down
15 changes: 10 additions & 5 deletions node/service/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use sc_consensus_manual_seal::rpc::{EngineCommand, ManualSeal, ManualSealApi};
use sc_network::NetworkService;
use sc_rpc::SubscriptionTaskExecutor;
use sc_rpc_api::DenyUnsafe;
use sc_transaction_graph::{ChainApi, Pool};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{
Backend as BlockchainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
Expand All @@ -51,11 +52,13 @@ use std::collections::BTreeMap;
use substrate_frame_rpc_system::{FullSystem, SystemApi};

/// Full client dependencies.
pub struct FullDeps<C, P, BE> {
pub struct FullDeps<C, P, A: ChainApi, BE> {
/// The client instance to use.
pub client: Arc<C>,
/// Transaction pool instance.
pub pool: Arc<P>,
/// Graph pool instance.
pub graph: Arc<Pool<A>>,
/// Whether to deny unsafe calls
pub deny_unsafe: DenyUnsafe,
/// The Node authority flag
Expand Down Expand Up @@ -86,8 +89,8 @@ pub struct FullDeps<C, P, BE> {
pub transaction_converter: TransactionConverters,
}
/// Instantiate all Full RPC extensions.
pub fn create_full<C, P, BE>(
deps: FullDeps<C, P, BE>,
pub fn create_full<C, P, BE, A>(
deps: FullDeps<C, P, A, BE>,
subscription_task_executor: SubscriptionTaskExecutor,
) -> jsonrpc_core::IoHandler<sc_rpc::Metadata>
where
Expand All @@ -98,13 +101,15 @@ where
C: BlockchainEvents<Block>,
C: HeaderBackend<Block> + HeaderMetadata<Block, Error = BlockChainError> + 'static,
C: Send + Sync + 'static,
A: ChainApi<Block = Block> + 'static,
C::Api: RuntimeApiCollection<StateBackend = BE::State>,
P: TransactionPool<Block = Block> + 'static,
{
let mut io = jsonrpc_core::IoHandler::default();
let FullDeps {
client,
pool,
graph,
deny_unsafe,
is_authority,
network,
Expand Down Expand Up @@ -176,7 +181,7 @@ where
)));
io.extend_with(Web3ApiServer::to_delegate(Web3Api::new(client.clone())));
io.extend_with(EthPubSubApiServer::to_delegate(EthPubSubApi::new(
pool.clone(),
pool,
client.clone(),
network,
SubscriptionManager::<HexEncodedIdProvider>::with_id_provider(
Expand All @@ -188,7 +193,7 @@ where
if ethapi_cmd.contains(&EthApiCmd::Txpool) {
io.extend_with(TxPoolServer::to_delegate(TxPool::new(
Arc::clone(&client),
pool,
graph,
)));
}

Expand Down
1 change: 1 addition & 0 deletions primitives/rpc/txpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license = 'GPL-3.0-only'
repository = 'https://github.com/PureStake/moonbeam/'

[dependencies]
codec = { package = "parity-scale-codec", version = "2.0.0", default-features = false }
ethereum = { version = "0.7.1", default-features = false, features = ["with-codec"] }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2", default-features = false }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.2", default-features = false }
Expand Down
12 changes: 11 additions & 1 deletion primitives/rpc/txpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@
#![allow(clippy::unnecessary_mut_passed)]
#![allow(clippy::too_many_arguments)]

use codec::{Decode, Encode};
use ethereum::Transaction;
use sp_runtime::traits::Block as BlockT;
use sp_std::vec::Vec;

#[derive(Eq, PartialEq, Clone, Encode, Decode, sp_runtime::RuntimeDebug)]
pub struct TxPoolResponse {
pub ready: Vec<Transaction>,
pub future: Vec<Transaction>,
}

sp_api::decl_runtime_apis! {
pub trait TxPoolRuntimeApi {
fn extrinsic_filter(xt: Vec<<Block as BlockT>::Extrinsic>) -> Vec<Transaction>;
fn extrinsic_filter(
xt_ready: Vec<<Block as BlockT>::Extrinsic>,
xt_future: Vec<<Block as BlockT>::Extrinsic>,
) -> TxPoolResponse;
}
}
22 changes: 15 additions & 7 deletions runtime/moonbase/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub use moonbeam_core_primitives::{
Signature,
};
use moonbeam_extensions_evm::runner::stack::TraceRunner as TraceRunnerT;
use moonbeam_rpc_primitives_txpool::TxPoolResponse;
use pallet_balances::NegativeImbalance;
use pallet_ethereum::Call::transact;
use pallet_ethereum::{Transaction as EthereumTransaction, TransactionAction};
Expand Down Expand Up @@ -114,7 +115,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion {
spec_name: create_runtime_str!("moonbase"),
impl_name: create_runtime_str!("moonbase"),
authoring_version: 3,
spec_version: 37,
spec_version: 38,
impl_version: 1,
apis: RUNTIME_API_VERSIONS,
transaction_version: 2,
Expand Down Expand Up @@ -1101,12 +1102,19 @@ impl_runtime_apis! {

impl moonbeam_rpc_primitives_txpool::TxPoolRuntimeApi<Block> for Runtime {
fn extrinsic_filter(
xts: Vec<<Block as BlockT>::Extrinsic>
) -> Vec<pallet_ethereum::Transaction> {
xts.into_iter().filter_map(|xt| match xt.function {
Call::Ethereum(transact(t)) => Some(t),
_ => None
}).collect()
xts_ready: Vec<<Block as BlockT>::Extrinsic>,
xts_future: Vec<<Block as BlockT>::Extrinsic>
) -> TxPoolResponse {
TxPoolResponse {
ready: xts_ready.into_iter().filter_map(|xt| match xt.function {
Call::Ethereum(transact(t)) => Some(t),
_ => None
}).collect(),
future: xts_future.into_iter().filter_map(|xt| match xt.function {
Call::Ethereum(transact(t)) => Some(t),
_ => None
}).collect(),
}
}
}

Expand Down
Loading