Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom tx pool #2196

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions node/service/src/custom_tx_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use futures::Future;
use sc_transaction_pool_api::{
ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor,
TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::{collections::HashMap, pin::Pin, sync::Arc};

pub struct CustomPool<I> {
inner_pool: Arc<I>,
}

impl<I> CustomPool<I> {
pub fn new(inner_pool: Arc<I>) -> Self {
Self { inner_pool }
}
}

impl<I> TransactionPool for CustomPool<I>
where
I: TransactionPool,
{
type Block = I::Block;
type Hash = I::Hash;
type InPoolTransaction = I::InPoolTransaction;
type Error = I::Error;

fn submit_at(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xts: Vec<TransactionFor<Self>>,
) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
self.inner_pool.submit_at(at, source, xts)
}

fn submit_one(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> PoolFuture<TxHash<Self>, Self::Error> {
self.inner_pool.submit_one(at, source, xt)
}

fn submit_and_watch(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
self.inner_pool.submit_and_watch(at, source, xt)
}

fn remove_invalid(&self, _: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
// Don't do anything on purpose.
Vec::new()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this! We tested something like this by forking the txpool crates here: #2186

It didn't work even with all collators running the code. (Was going to report back on the issue today).

Why? Here's my guess:

  1. Transaction C dependent on B dependent on A
  2. Txs A, B, C sent to the pool
  3. Tx A gets into a fork
  4. Txs B and C now stay in the pool (good!)
  5. Fork with Tx A goes away
  6. Tx A never gets re-added to the pool
  7. Txs B and C continue to be stuck

Might not be 100% correct in what is happening, but that's what it looked like from the tx pools.

We ended up using a slightly modified build of the omni-node with the fatxpool and that... might be working. Still verifying 100%.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6. Tx A never gets re-added to the pool

If the fork is not the best chain, it will get re-added to the pool. This is also the case with the old pool.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That did not appear to be true in tests, but perhaps something else was causing that issue.

}

fn status(&self) -> PoolStatus {
self.inner_pool.status()
}

fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
self.inner_pool.import_notification_stream()
}

fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
self.inner_pool.hash_of(xt)
}

fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
self.inner_pool.on_broadcasted(propagations)
}

fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
self.inner_pool.ready_transaction(hash)
}

fn ready_at(
&self,
at: NumberFor<Self::Block>,
) -> Pin<
Box<
dyn Future<
Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
> + Send,
>,
> {
self.inner_pool.ready_at(at)
}

fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
self.inner_pool.ready()
}

fn futures(&self) -> Vec<Self::InPoolTransaction> {
self.inner_pool.futures()
}
}

1 change: 1 addition & 0 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
pub mod block_sealing;
pub mod chain_spec;
pub mod common;
mod custom_tx_pool;
pub mod rpc;
pub mod service;
2 changes: 1 addition & 1 deletion node/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ fn start_consensus(
let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
task_manager.spawn_handle(),
client.clone(),
transaction_pool,
std::sync::Arc::new(crate::custom_tx_pool::CustomPool::new(transaction_pool)),
prometheus_registry,
telemetry.clone(),
);
Expand Down