Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Fix race in RPC subscriptions test (#9142) (#9145)
Browse files Browse the repository at this point in the history
automerge
  • Loading branch information
mergify[bot] authored Mar 28, 2020
1 parent 3ae6e0b commit 1027b06
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 35 deletions.
7 changes: 5 additions & 2 deletions core/src/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,11 @@ where
let root = if root.len() == 1 { root[0] } else { 0 };
if desired_slot.len() == 1 {
let slot = desired_slot[0];
let desired_bank = bank_forks.read().unwrap().get(slot).unwrap().clone();
let results = bank_method(&desired_bank, hashmap_key);
let results = {
let bank_forks = bank_forks.read().unwrap();
let desired_bank = bank_forks.get(slot).unwrap();
bank_method(&desired_bank, hashmap_key)
};
for result in filter_results(results, root) {
notifier.notify(
Response {
Expand Down
75 changes: 42 additions & 33 deletions core/tests/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use solana_sdk::{
commitment_config::CommitmentConfig,
hash::Hash,
pubkey::Pubkey,
signature::Signer,
system_transaction,
transaction::{self, Transaction},
};
Expand All @@ -24,7 +25,6 @@ use std::{
fs::remove_dir_all,
net::UdpSocket,
sync::mpsc::channel,
sync::{Arc, Mutex},
thread::sleep,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -210,25 +210,27 @@ fn test_rpc_subscriptions() {
..
} = TestValidator::run();

// Create transaction signatures to subscribe to
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let transactions: Vec<Transaction> = (0..100)
transactions_socket.connect(leader_data.tpu).unwrap();

// Create transaction signatures to subscribe to
let transactions: Vec<Transaction> = (0..1000)
.map(|_| system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash))
.collect();
let mut signature_set: HashSet<String> = transactions
.iter()
.map(|tx| tx.signatures[0].to_string())
.collect();

// Track when subscriptions are ready
let (ready_sender, ready_receiver) = channel::<()>();
// Track when status notifications are received
let (status_sender, status_receiver) = channel::<(String, Response<transaction::Result<()>>)>();

// Create the pub sub runtime
let mut rt = Runtime::new().unwrap();
let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub);

let (status_sender, status_receiver) = channel::<(String, Response<transaction::Result<()>>)>();
let status_sender = Arc::new(Mutex::new(status_sender));
let (sent_sender, sent_receiver) = channel::<()>();
let sent_sender = Arc::new(Mutex::new(sent_sender));

// Subscribe to all signatures
rt.spawn({
let connect = ws::try_connect::<PubsubClient>(&rpc_pubsub_url).unwrap();
Expand All @@ -237,18 +239,12 @@ fn test_rpc_subscriptions() {
.and_then(move |client| {
for sig in signature_set {
let status_sender = status_sender.clone();
let sent_sender = sent_sender.clone();
tokio::spawn(
client
.signature_subscribe(sig.clone(), None)
.and_then(move |sig_stream| {
sent_sender.lock().unwrap().send(()).unwrap();
sig_stream.for_each(move |result| {
status_sender
.lock()
.unwrap()
.send((sig.clone(), result))
.unwrap();
status_sender.send((sig.clone(), result)).unwrap();
future::ok(())
})
})
Expand All @@ -257,37 +253,50 @@ fn test_rpc_subscriptions() {
}),
);
}
tokio::spawn(
client
.slot_subscribe()
.and_then(move |slot_stream| {
slot_stream.for_each(move |_| {
ready_sender.send(()).unwrap();
future::ok(())
})
})
.map_err(|err| {
eprintln!("slot sub err: {:#?}", err);
}),
);
future::ok(())
})
.map_err(|_| ())
});

// Wait for signature subscriptions
let deadline = Instant::now() + Duration::from_secs(2);
(0..transactions.len()).for_each(|_| {
sent_receiver
.recv_timeout(deadline.saturating_duration_since(Instant::now()))
.unwrap();
});
ready_receiver.recv_timeout(Duration::from_secs(2)).unwrap();

let rpc_client = RpcClient::new_socket(leader_data.rpc);
let mut transaction_count = rpc_client
.get_transaction_count_with_commitment(CommitmentConfig::recent())
.unwrap();
let mut mint_balance = rpc_client
.get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::recent())
.unwrap()
.value;
assert!(mint_balance >= transactions.len() as u64);

// Send all transactions to tpu socket for processing
transactions.iter().for_each(|tx| {
transactions_socket
.send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu)
.send(&bincode::serialize(&tx).unwrap())
.unwrap();
});

// Track mint balance to know when transactions have completed
let now = Instant::now();
let expected_transaction_count = transaction_count + transactions.len() as u64;
while transaction_count < expected_transaction_count && now.elapsed() < Duration::from_secs(5) {
transaction_count = rpc_client
.get_transaction_count_with_commitment(CommitmentConfig::recent())
.unwrap();
sleep(Duration::from_millis(200));
let expected_mint_balance = mint_balance - transactions.len() as u64;
while mint_balance != expected_mint_balance && now.elapsed() < Duration::from_secs(5) {
mint_balance = rpc_client
.get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::recent())
.unwrap()
.value;
sleep(Duration::from_millis(100));
}

// Wait for all signature subscriptions
Expand All @@ -300,12 +309,12 @@ fn test_rpc_subscriptions() {
assert!(signature_set.remove(&sig));
}
Err(_err) => {
eprintln!(
assert!(
false,
"recv_timeout, {}/{} signatures remaining",
signature_set.len(),
transactions.len()
);
assert!(false)
}
}
}
Expand Down

0 comments on commit 1027b06

Please sign in to comment.