Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

RPC Notifier Signal when Setup Complete #27481

Merged
merged 2 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ impl Validator {
block_commitment_cache.clone(),
optimistically_confirmed_bank.clone(),
&config.pubsub_config,
None,

This comment was marked as resolved.

));

let max_slots = Arc::new(MaxSlots::default());
Expand Down
27 changes: 23 additions & 4 deletions rpc/src/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ impl RpcSubscriptions {
block_commitment_cache,
optimistically_confirmed_bank,
&PubSubConfig::default(),
None,
)
}

Expand All @@ -573,14 +574,13 @@ impl RpcSubscriptions {
let blockstore = Blockstore::open(&ledger_path).unwrap();
let blockstore = Arc::new(blockstore);

Self::new_with_config(
Self::new_for_tests_with_blockstore(
exit,
max_complete_transaction_status_slot,
blockstore,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
&PubSubConfig::default_for_tests(),
)
}

Expand All @@ -592,15 +592,30 @@ impl RpcSubscriptions {
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
) -> Self {
Self::new_with_config(
let rpc_notifier_ready = Arc::new(AtomicBool::new(false));

let rpc_subscriptions = Self::new_with_config(
exit,
max_complete_transaction_status_slot,
blockstore,
bank_forks,
block_commitment_cache,
optimistically_confirmed_bank,
&PubSubConfig::default_for_tests(),
)
Some(rpc_notifier_ready.clone()),
);

// Ensure RPC notifier is ready to receive notifications before proceeding
let start_time = Instant::now();
loop {
if rpc_notifier_ready.load(Ordering::Relaxed) {
break;
} else if (Instant::now() - start_time).as_millis() > 5000 {
panic!("RPC notifier thread setup took too long");
}
}

rpc_subscriptions
}

pub fn new_with_config(
Expand All @@ -611,6 +626,7 @@ impl RpcSubscriptions {
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
config: &PubSubConfig,
rpc_notifier_ready: Option<Arc<AtomicBool>>,
) -> Self {
let (notification_sender, notification_receiver) = crossbeam_channel::unbounded();

Expand Down Expand Up @@ -640,6 +656,9 @@ impl RpcSubscriptions {
.build()
.unwrap();
pool.install(|| {
if let Some(rpc_notifier_ready) = rpc_notifier_ready {

This comment was marked as resolved.

rpc_notifier_ready.fetch_or(true, Ordering::Relaxed);

This comment was marked as resolved.

}
Self::process_notifications(
exit_clone,
max_complete_transaction_status_slot,
Expand Down