From 4277564d9d357e9852de44832e472ceec617ee8b Mon Sep 17 00:00:00 2001 From: Brennan Watt Date: Tue, 30 Aug 2022 12:25:55 -0700 Subject: [PATCH 1/2] Add delay for RPC notification setup --- rpc/src/rpc_subscriptions.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 27390494b9c98a..36f7364acdc763 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -573,7 +573,7 @@ impl RpcSubscriptions { let blockstore = Blockstore::open(&ledger_path).unwrap(); let blockstore = Arc::new(blockstore); - Self::new_with_config( + let rpc_subscriptions = Self::new_with_config( exit, max_complete_transaction_status_slot, blockstore, @@ -581,7 +581,10 @@ impl RpcSubscriptions { block_commitment_cache, optimistically_confirmed_bank, &PubSubConfig::default_for_tests(), - ) + ); + // Ensure RPC notifier is ready to receive notifications before proceeding + std::thread::sleep(Duration::from_millis(100)); + rpc_subscriptions } pub fn new_for_tests_with_blockstore( From ab4813f74d9e22bf46ef24b4d031f4d997b2eeb0 Mon Sep 17 00:00:00 2001 From: Brennan Watt Date: Thu, 1 Sep 2022 14:19:58 -0700 Subject: [PATCH 2/2] RPC notifier signal when ready --- core/src/validator.rs | 1 + rpc/src/rpc_subscriptions.rs | 32 ++++++++++++++++++++++++-------- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index a60bbb39169030..3c9322152b55ab 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -722,6 +722,7 @@ impl Validator { block_commitment_cache.clone(), optimistically_confirmed_bank.clone(), &config.pubsub_config, + None, )); let max_slots = Arc::new(MaxSlots::default()); diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 36f7364acdc763..937508f044bad1 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -559,6 +559,7 @@ impl RpcSubscriptions { block_commitment_cache, optimistically_confirmed_bank, &PubSubConfig::default(), + None, ) } @@ -573,18 +574,14 @@ impl RpcSubscriptions { let blockstore = Blockstore::open(&ledger_path).unwrap(); let blockstore = Arc::new(blockstore); - let rpc_subscriptions = Self::new_with_config( + Self::new_for_tests_with_blockstore( exit, max_complete_transaction_status_slot, blockstore, bank_forks, block_commitment_cache, optimistically_confirmed_bank, - &PubSubConfig::default_for_tests(), - ); - // Ensure RPC notifier is ready to receive notifications before proceeding - std::thread::sleep(Duration::from_millis(100)); - rpc_subscriptions + ) } pub fn new_for_tests_with_blockstore( @@ -595,7 +592,9 @@ impl RpcSubscriptions { block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, ) -> Self { - Self::new_with_config( + let rpc_notifier_ready = Arc::new(AtomicBool::new(false)); + + let rpc_subscriptions = Self::new_with_config( exit, max_complete_transaction_status_slot, blockstore, @@ -603,7 +602,20 @@ impl RpcSubscriptions { block_commitment_cache, optimistically_confirmed_bank, &PubSubConfig::default_for_tests(), - ) + Some(rpc_notifier_ready.clone()), + ); + + // Ensure RPC notifier is ready to receive notifications before proceeding + let start_time = Instant::now(); + loop { + if rpc_notifier_ready.load(Ordering::Relaxed) { + break; + } else if (Instant::now() - start_time).as_millis() > 5000 { + panic!("RPC notifier thread setup took too long"); + } + } + + rpc_subscriptions } pub fn new_with_config( @@ -614,6 +626,7 @@ impl RpcSubscriptions { block_commitment_cache: Arc>, optimistically_confirmed_bank: Arc>, config: &PubSubConfig, + rpc_notifier_ready: Option>, ) -> Self { let (notification_sender, notification_receiver) = crossbeam_channel::unbounded(); @@ -643,6 +656,9 @@ impl RpcSubscriptions { .build() .unwrap(); pool.install(|| { + if let Some(rpc_notifier_ready) = rpc_notifier_ready { + rpc_notifier_ready.fetch_or(true, Ordering::Relaxed); + } Self::process_notifications( exit_clone, max_complete_transaction_status_slot,