Skip to content

Commit

Permalink
Merge pull request #4258 from chenyukang/yukang-fix-rbf-parallel-issue
Browse files Browse the repository at this point in the history
Fix concurrency issue for RBF
  • Loading branch information
quake authored Dec 11, 2023
2 parents a00ba13 + c0e7795 commit ddd1309
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 96 deletions.
22 changes: 18 additions & 4 deletions sync/src/relayer/tests/reconstruct_block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::helper::{build_chain, new_transaction};
use crate::relayer::packed::{CellInput, OutPoint};
use crate::relayer::ReconstructionResult;
use crate::StatusCode;
use ckb_tx_pool::{PlugTarget, TxEntry};
Expand All @@ -8,7 +9,6 @@ use ckb_types::{
packed::{self, CompactBlockBuilder},
};
use std::collections::HashSet;

// There are more test cases in block_transactions_process and compact_block_process.rs
#[test]
fn test_missing_txs() {
Expand Down Expand Up @@ -64,9 +64,23 @@ fn test_missing_txs() {
#[test]
fn test_reconstruct_transactions_and_uncles() {
let (relayer, always_success_out_point) = build_chain(5);
let prepare: Vec<TransactionView> = (0..20)
.map(|i| new_transaction(&relayer, i, &always_success_out_point))
.collect();
let parent = new_transaction(&relayer, 0, &always_success_out_point);

// create a chain of transactions as prepare
let mut prepare = vec![parent];
while prepare.len() <= 20 {
let parent = prepare.last().unwrap();
let child = parent
.as_advanced_builder()
.set_inputs(vec![{
CellInput::new_builder()
.previous_output(OutPoint::new(parent.hash(), 0))
.build()
}])
.set_outputs(vec![parent.output(0).unwrap()])
.build();
prepare.push(child);
}
let uncle = BlockBuilder::default().build();

let block = BlockBuilder::default()
Expand Down
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(RbfContainInvalidCells),
Box::new(RbfRejectReplaceProposed),
Box::new(RbfReplaceProposedSuccess),
Box::new(RbfConcurrency),
Box::new(CompactBlockEmpty),
Box::new(CompactBlockEmptyParentUnknown),
Box::new(CompactBlockPrefilled),
Expand Down
10 changes: 8 additions & 2 deletions test/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct Node {
consensus: Consensus,
p2p_listen: String,
rpc_client: RpcClient,
rpc_listen: String,

node_id: Option<String>, // initialize when starts node
guard: Option<ProcessGuard>, // initialize when starts node
Expand Down Expand Up @@ -134,8 +135,8 @@ impl Node {
};

let p2p_listen = app_config.network.listen_addresses[0].to_string();
let rpc_address = app_config.rpc.listen_address;
let rpc_client = RpcClient::new(&format!("http://{rpc_address}/"));
let rpc_listen = format!("http://{}/", app_config.rpc.listen_address);
let rpc_client = RpcClient::new(&rpc_listen);
let consensus = {
// Ensure the data path is available because chain_spec.build_consensus() needs to access the
// system-cell data.
Expand All @@ -154,6 +155,7 @@ impl Node {
consensus,
p2p_listen,
rpc_client,
rpc_listen,
node_id: None,
guard: None,
}
Expand Down Expand Up @@ -184,6 +186,10 @@ impl Node {
self.p2p_listen.clone()
}

pub fn rpc_listen(&self) -> String {
self.rpc_listen.clone()
}

pub fn p2p_address(&self) -> String {
format!("{}/p2p/{}", self.p2p_listen(), self.node_id())
}
Expand Down
3 changes: 2 additions & 1 deletion test/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ impl RpcClient {
}
}

jsonrpc!(pub struct Inner {
jsonrpc!(
pub struct Inner {
pub fn get_block(&self, _hash: H256) -> Option<BlockView>;
pub fn get_fork_block(&self, _hash: H256) -> Option<BlockView>;
pub fn get_block_by_number(&self, _number: BlockNumber) -> Option<BlockView>;
Expand Down
112 changes: 80 additions & 32 deletions test/src/specs/tx_pool/replace.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use crate::{utils::wait_until, Node, Spec};
use crate::{
rpc::RpcClient,
util::{cell::gen_spendable, transaction::always_success_transactions},
utils::wait_until,
Node, Spec,
};
use ckb_jsonrpc_types::Status;
use ckb_logger::info;
use ckb_types::{
Expand Down Expand Up @@ -441,40 +446,23 @@ impl Spec for RbfContainInvalidCells {

node0.mine_until_out_bootstrap_period();

// build txs chain
let tx0 = node0.new_transaction_spend_tip_cellbase();
let mut txs = vec![tx0];
let max_count = 5;
while txs.len() <= max_count {
let parent = txs.last().unwrap();
let child = parent
.as_advanced_builder()
.set_inputs(vec![{
CellInput::new_builder()
.previous_output(OutPoint::new(parent.hash(), 0))
.build()
}])
.set_outputs(vec![parent.output(0).unwrap()])
.build();
txs.push(child);
}
assert_eq!(txs.len(), max_count + 1);
// send Tx chain
for tx in txs[..=max_count - 1].iter() {
let cells = gen_spendable(node0, 3);
let txs = always_success_transactions(node0, &cells);
for tx in txs.iter() {
let ret = node0.rpc_client().send_transaction_result(tx.data().into());
assert!(ret.is_ok());
}

let clone_tx = txs[2].clone();

let cell = CellDep::new_builder()
.out_point(OutPoint::new(txs[1].hash(), 0))
.build();

// Set tx2 fee to a higher value
let output2 = CellOutputBuilder::default()
.capacity(capacity_bytes!(70).pack())
.build();

// build a cell from conflicts txs's output
let cell = CellDep::new_builder()
.out_point(OutPoint::new(txs[2].hash(), 0))
.build();
let tx2 = clone_tx
.as_advanced_builder()
.set_inputs(vec![{
Expand All @@ -490,11 +478,6 @@ impl Spec for RbfContainInvalidCells {
.rpc_client()
.send_transaction_result(tx2.data().into());
assert!(res.is_err(), "tx2 should be rejected");
assert!(res
.err()
.unwrap()
.to_string()
.contains("new Tx contains cell deps from conflicts"));
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
Expand Down Expand Up @@ -687,7 +670,7 @@ impl Spec for RbfReplaceProposedSuccess {
let tx2_status = node0.rpc_client().get_transaction(tx2.hash()).tx_status;
assert_eq!(tx2_status.status, Status::Pending);

// submit a black block
// submit a blank block
let example = node0.new_block(None, None, None);
let blank_block = example
.as_advanced_builder()
Expand Down Expand Up @@ -730,3 +713,68 @@ impl Spec for RbfReplaceProposedSuccess {
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}

pub struct RbfConcurrency;
impl Spec for RbfConcurrency {
fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];

node0.mine_until_out_bootstrap_period();
node0.new_block_with_blocking(|template| template.number.value() != 13);
let tx_hash_0 = node0.generate_transaction();
info!("Generate 4 txs with same input");
let tx1 = node0.new_transaction(tx_hash_0.clone());

let mut conflicts = vec![tx1];
// tx1 capacity is 100, set other txs to higer fee
let fees = vec![
capacity_bytes!(83),
capacity_bytes!(82),
capacity_bytes!(81),
capacity_bytes!(80),
];
for fee in fees.iter() {
let tx2_temp = node0.new_transaction(tx_hash_0.clone());
let output = CellOutputBuilder::default().capacity(fee.pack()).build();

let tx2 = tx2_temp
.as_advanced_builder()
.set_outputs(vec![output])
.build();
conflicts.push(tx2);
}

// make 5 threads to set_transaction concurrently
let mut handles = vec![];
for tx in &conflicts {
let cur_tx = tx.clone();
let rpc_address = node0.rpc_listen();
let handle = std::thread::spawn(move || {
let rpc_client = RpcClient::new(&rpc_address);
let _ = rpc_client.send_transaction_result(cur_tx.data().into());
});
handles.push(handle);
}
for handle in handles {
let _ = handle.join();
}

let status: Vec<_> = conflicts
.iter()
.map(|tx| {
let res = node0.rpc_client().get_transaction(tx.hash());
res.tx_status.status
})
.collect();

// the last tx should be in Pending(with the highest fee), others should be in Rejected
assert_eq!(status[4], Status::Pending);
for s in status.iter().take(4) {
assert_eq!(*s, Status::Rejected);
}
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}
14 changes: 4 additions & 10 deletions tx-pool/src/chunk_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ impl ChunkProcess {
let tx_hash = tx.hash();

let (ret, snapshot) = self.service.pre_check(&tx).await;
let (tip_hash, rtx, status, fee, tx_size, conflicts) =
try_or_return_with_snapshot!(ret, snapshot);
let (tip_hash, rtx, status, fee, tx_size) = try_or_return_with_snapshot!(ret, snapshot);

let cached = self.service.fetch_tx_verify_cache(&tx_hash).await;

Expand All @@ -244,10 +243,8 @@ impl ChunkProcess {
let completed = try_or_return_with_snapshot!(ret, snapshot);

let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size);
let (ret, submit_snapshot) = self
.service
.submit_entry(tip_hash, entry, status, conflicts)
.await;
let (ret, submit_snapshot) =
self.service.submit_entry(tip_hash, entry, status).await;
try_or_return_with_snapshot!(ret, submit_snapshot);
self.service
.after_process(tx, remote, &submit_snapshot, &Ok(completed))
Expand Down Expand Up @@ -325,10 +322,7 @@ impl ChunkProcess {
}

let entry = TxEntry::new(rtx, completed.cycles, fee, tx_size);
let (ret, submit_snapshot) = self
.service
.submit_entry(tip_hash, entry, status, conflicts)
.await;
let (ret, submit_snapshot) = self.service.submit_entry(tip_hash, entry, status).await;
try_or_return_with_snapshot!(ret, snapshot);

self.service.notify_block_assembler(status).await;
Expand Down
27 changes: 24 additions & 3 deletions tx-pool/src/component/edges.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use ckb_types::packed::{Byte32, OutPoint, ProposalShortId};
use ckb_types::{
core::tx_pool::Reject,
packed::{Byte32, OutPoint, ProposalShortId},
};
use std::collections::{hash_map::Entry, HashMap, HashSet};

#[derive(Default, Debug, Clone)]
Expand Down Expand Up @@ -27,8 +30,26 @@ impl Edges {
self.deps.len()
}

pub(crate) fn insert_input(&mut self, out_point: OutPoint, txid: ProposalShortId) {
self.inputs.insert(out_point, txid);
pub(crate) fn insert_input(
&mut self,
out_point: OutPoint,
txid: ProposalShortId,
) -> Result<(), Reject> {
// inputs is occupied means double speanding happened here
match self.inputs.entry(out_point.clone()) {
Entry::Occupied(occupied) => {
let msg =
format!(
"txpool unexpected double-spending out_point: {:?} old_tx: {:?} new_tx: {:?}",
out_point, occupied.get(), txid
);
Err(Reject::RBFRejected(msg))
}
Entry::Vacant(vacant) => {
vacant.insert(txid);
Ok(())
}
}
}

pub(crate) fn remove_input(&mut self, out_point: &OutPoint) -> Option<ProposalShortId> {
Expand Down
7 changes: 4 additions & 3 deletions tx-pool/src/component/pool_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ impl PoolMap {
}
trace!("pool_map.add_{:?} {}", status, entry.transaction().hash());
self.check_and_record_ancestors(&mut entry)?;
self.record_entry_edges(&entry)?;
self.insert_entry(&entry, status);
self.record_entry_edges(&entry);
self.record_entry_descendants(&entry);
self.track_entry_statics();
Ok(true)
Expand Down Expand Up @@ -389,7 +389,7 @@ impl PoolMap {
}
}

fn record_entry_edges(&mut self, entry: &TxEntry) {
fn record_entry_edges(&mut self, entry: &TxEntry) -> Result<(), Reject> {
let tx_short_id: ProposalShortId = entry.proposal_short_id();
let header_deps = entry.transaction().header_deps();
let related_dep_out_points: Vec<_> = entry.related_dep_out_points().cloned().collect();
Expand All @@ -398,7 +398,7 @@ impl PoolMap {
// if input reference a in-pool output, connect it
// otherwise, record input for conflict check
for i in inputs {
self.edges.insert_input(i.to_owned(), tx_short_id.clone());
self.edges.insert_input(i.to_owned(), tx_short_id.clone())?;
}

// record dep-txid
Expand All @@ -411,6 +411,7 @@ impl PoolMap {
.header_deps
.insert(tx_short_id, header_deps.into_iter().collect());
}
Ok(())
}

fn record_entry_descendants(&mut self, entry: &TxEntry) {
Expand Down
Loading

0 comments on commit ddd1309

Please sign in to comment.