Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
rpc: add getSlotLeaders method (#16057) (#16079)
Browse files Browse the repository at this point in the history
(cherry picked from commit e7fd7d4)

Co-authored-by: Justin Starry <justin@solana.com>
  • Loading branch information
mergify[bot] and jstarry authored Mar 23, 2021
1 parent 9d37a33 commit c3c4991
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 28 deletions.
19 changes: 19 additions & 0 deletions client/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::{
cmp::min,
net::SocketAddr,
str::FromStr,
sync::RwLock,
thread::sleep,
time::{Duration, Instant},
Expand Down Expand Up @@ -405,6 +406,24 @@ impl RpcClient {
)
}

pub fn get_slot_leaders(&self, start_slot: Slot, limit: u64) -> ClientResult<Vec<Pubkey>> {
self.send(RpcRequest::GetSlotLeaders, json!([start_slot, limit]))
.and_then(|slot_leaders: Vec<String>| {
slot_leaders
.iter()
.map(|slot_leader| {
Pubkey::from_str(slot_leader).map_err(|err| {
ClientErrorKind::Custom(format!(
"pubkey deserialization failed: {}",
err
))
.into()
})
})
.collect()
})
}

pub fn supply(&self) -> RpcResult<RpcSupply> {
self.supply_with_commitment(self.commitment_config)
}
Expand Down
3 changes: 3 additions & 0 deletions client/src/rpc_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub enum RpcRequest {
GetSignatureStatuses,
GetSlot,
GetSlotLeader,
GetSlotLeaders,
GetStorageTurn,
GetStorageTurnRate,
GetSlotsPerSegment,
Expand Down Expand Up @@ -96,6 +97,7 @@ impl fmt::Display for RpcRequest {
RpcRequest::GetSignatureStatuses => "getSignatureStatuses",
RpcRequest::GetSlot => "getSlot",
RpcRequest::GetSlotLeader => "getSlotLeader",
RpcRequest::GetSlotLeaders => "getSlotLeaders",
RpcRequest::GetStorageTurn => "getStorageTurn",
RpcRequest::GetStorageTurnRate => "getStorageTurnRate",
RpcRequest::GetSlotsPerSegment => "getSlotsPerSegment",
Expand Down Expand Up @@ -128,6 +130,7 @@ pub const MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT: usize = 1_000;
pub const MAX_MULTIPLE_ACCOUNTS: usize = 100;
pub const NUM_LARGEST_ACCOUNTS: usize = 20;
pub const MAX_GET_PROGRAM_ACCOUNT_FILTERS: usize = 4;
pub const MAX_GET_SLOT_LEADERS: usize = 5000;

// Validators that are this number of slots behind are considered delinquent
pub const DELINQUENT_VALIDATOR_SLOT_DISTANCE: u64 = 128;
Expand Down
161 changes: 136 additions & 25 deletions core/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ use solana_client::{
TokenAccountsFilter, DELINQUENT_VALIDATOR_SLOT_DISTANCE, MAX_GET_CONFIRMED_BLOCKS_RANGE,
MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT,
MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS_SLOT_RANGE, MAX_GET_PROGRAM_ACCOUNT_FILTERS,
MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, MAX_MULTIPLE_ACCOUNTS, NUM_LARGEST_ACCOUNTS,
MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, MAX_GET_SLOT_LEADERS, MAX_MULTIPLE_ACCOUNTS,
NUM_LARGEST_ACCOUNTS,
},
rpc_response::Response as RpcResponse,
rpc_response::*,
};
use solana_faucet::faucet::request_airdrop_transaction;
use solana_ledger::{blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path};
use solana_ledger::{
blockstore::Blockstore, blockstore_db::BlockstoreError, get_tmp_ledger_path,
leader_schedule_cache::LeaderScheduleCache,
};
use solana_metrics::inc_new_counter_info;
use solana_perf::packet::PACKET_DATA_SIZE;
use solana_runtime::{
Expand Down Expand Up @@ -139,6 +143,7 @@ pub struct JsonRpcRequestProcessor {
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
max_slots: Arc<MaxSlots>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
}
impl Metadata for JsonRpcRequestProcessor {}

Expand Down Expand Up @@ -223,6 +228,7 @@ impl JsonRpcRequestProcessor {
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
largest_accounts_cache: Arc<RwLock<LargestAccountsCache>>,
max_slots: Arc<MaxSlots>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
) -> (Self, Receiver<TransactionInfo>) {
let (sender, receiver) = channel();
(
Expand All @@ -242,6 +248,7 @@ impl JsonRpcRequestProcessor {
optimistically_confirmed_bank,
largest_accounts_cache,
max_slots,
leader_schedule_cache,
},
receiver,
)
Expand Down Expand Up @@ -283,6 +290,7 @@ impl JsonRpcRequestProcessor {
})),
largest_accounts_cache: Arc::new(RwLock::new(LargestAccountsCache::new(30))),
max_slots: Arc::new(MaxSlots::default()),
leader_schedule_cache: Arc::new(LeaderScheduleCache::new_from_bank(bank)),
}
}

Expand Down Expand Up @@ -1971,29 +1979,14 @@ pub mod rpc_minimal {

debug!("get_leader_schedule rpc request received: {:?}", slot);

Ok(
solana_ledger::leader_schedule_utils::leader_schedule(epoch, &bank).map(
|leader_schedule| {
let mut leader_schedule_by_identity = HashMap::new();

for (slot_index, identity_pubkey) in
leader_schedule.get_slot_leaders().iter().enumerate()
{
leader_schedule_by_identity
.entry(identity_pubkey)
.or_insert_with(Vec::new)
.push(slot_index);
}

leader_schedule_by_identity
.into_iter()
.map(|(identity_pubkey, slot_indices)| {
(identity_pubkey.to_string(), slot_indices)
})
.collect()
},
),
)
Ok(meta
.leader_schedule_cache
.get_epoch_leader_schedule(epoch)
.map(|leader_schedule| {
solana_ledger::leader_schedule_utils::leader_schedule_by_identity(
leader_schedule.get_slot_leaders().iter().enumerate(),
)
}))
}
}
}
Expand Down Expand Up @@ -2194,6 +2187,14 @@ pub mod rpc_full {
commitment: Option<CommitmentConfig>,
) -> Result<String>;

#[rpc(meta, name = "getSlotLeaders")]
fn get_slot_leaders(
&self,
meta: Self::Metadata,
start_slot: Slot,
end_slot: Slot,
) -> Result<Vec<String>>;

#[rpc(meta, name = "minimumLedgerSlot")]
fn minimum_ledger_slot(&self, meta: Self::Metadata) -> Result<Slot>;

Expand Down Expand Up @@ -2801,6 +2802,56 @@ pub mod rpc_full {
Ok(meta.get_slot_leader(commitment))
}

fn get_slot_leaders(
&self,
meta: Self::Metadata,
start_slot: Slot,
limit: u64,
) -> Result<Vec<String>> {
debug!(
"get_slot_leaders rpc request received (start: {} limit: {})",
start_slot, limit
);

let limit = limit as usize;
if limit > MAX_GET_SLOT_LEADERS {
return Err(Error::invalid_params(format!(
"Invalid limit; max {}",
MAX_GET_SLOT_LEADERS
)));
}

let bank = meta.bank(None);
let (mut epoch, mut slot_index) =
bank.epoch_schedule().get_epoch_and_slot_index(start_slot);

let mut slot_leaders = Vec::with_capacity(limit);
while slot_leaders.len() < limit {
if let Some(leader_schedule) =
meta.leader_schedule_cache.get_epoch_leader_schedule(epoch)
{
slot_leaders.extend(
leader_schedule
.get_slot_leaders()
.iter()
.skip(slot_index as usize)
.take(limit.saturating_sub(slot_leaders.len()))
.map(|pubkey| pubkey.to_string()),
);
} else {
return Err(Error::invalid_params(format!(
"Invalid slot range: leader schedule for epoch {} is unavailable",
epoch
)));
}

epoch += 1;
slot_index = 0;
}

Ok(slot_leaders)
}

fn minimum_ledger_slot(&self, meta: Self::Metadata) -> Result<Slot> {
debug!("minimum_ledger_slot rpc request received");
meta.minimum_ledger_slot()
Expand Down Expand Up @@ -3314,6 +3365,7 @@ pub mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
max_slots,
Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);

Expand Down Expand Up @@ -3819,6 +3871,62 @@ pub mod tests {
assert_eq!(schedule, None);
}

#[test]
fn test_rpc_get_slot_leaders() {
let bob_pubkey = solana_sdk::pubkey::new_rand();
let RpcHandler { io, meta, bank, .. } = start_rpc_handler_with_tx(&bob_pubkey);

// Test that slot leaders will be returned across epochs
let query_start = 0;
let query_limit = 2 * bank.epoch_schedule().slots_per_epoch;

let req = format!(
r#"{{"jsonrpc":"2.0","id":1,"method":"getSlotLeaders", "params": [{}, {}]}}"#,
query_start, query_limit
);
let rep = io.handle_request_sync(&req, meta.clone());
let res: Response = serde_json::from_str(&rep.expect("actual response"))
.expect("actual response deserialization");

let slot_leaders: Vec<String> = if let Response::Single(res) = res {
if let Output::Success(res) = res {
serde_json::from_value(res.result).unwrap()
} else {
panic!("Expected success for {} but received: {:?}", req, res);
}
} else {
panic!("Expected single response");
};

assert_eq!(slot_leaders.len(), query_limit as usize);

// Test that invalid limit returns an error
let query_start = 0;
let query_limit = 5001;

let req = format!(
r#"{{"jsonrpc":"2.0","id":1,"method":"getSlotLeaders", "params": [{}, {}]}}"#,
query_start, query_limit
);
let rep = io.handle_request_sync(&req, meta.clone());
let res: Value = serde_json::from_str(&rep.expect("actual response"))
.expect("actual response deserialization");
assert!(res.get("error").is_some());

// Test that invalid epoch returns an error
let query_start = 2 * bank.epoch_schedule().slots_per_epoch;
let query_limit = 10;

let req = format!(
r#"{{"jsonrpc":"2.0","id":1,"method":"getSlotLeaders", "params": [{}, {}]}}"#,
query_start, query_limit
);
let rep = io.handle_request_sync(&req, meta);
let res: Value = serde_json::from_str(&rep.expect("actual response"))
.expect("actual response deserialization");
assert!(res.get("error").is_some());
}

#[test]
fn test_rpc_get_account_info() {
let bob_pubkey = solana_sdk::pubkey::new_rand();
Expand Down Expand Up @@ -4723,6 +4831,7 @@ pub mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
Arc::new(MaxSlots::default()),
Arc::new(LeaderScheduleCache::default()),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);

Expand Down Expand Up @@ -4998,6 +5107,7 @@ pub mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
Arc::new(MaxSlots::default()),
Arc::new(LeaderScheduleCache::default()),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
assert_eq!(
Expand Down Expand Up @@ -6287,6 +6397,7 @@ pub mod tests {
optimistically_confirmed_bank.clone(),
Arc::new(RwLock::new(LargestAccountsCache::new(30))),
Arc::new(MaxSlots::default()),
Arc::new(LeaderScheduleCache::default()),
);

let mut io = MetaIoHandler::default();
Expand Down
5 changes: 4 additions & 1 deletion core/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use jsonrpc_http_server::{
};
use regex::Regex;
use solana_client::rpc_cache::LargestAccountsCache;
use solana_ledger::blockstore::Blockstore;
use solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache};
use solana_metrics::inc_new_counter_info;
use solana_runtime::{
bank_forks::{BankForks, SnapshotConfig},
Expand Down Expand Up @@ -275,6 +275,7 @@ impl JsonRpcService {
send_transaction_retry_ms: u64,
send_transaction_leader_forward_count: u64,
max_slots: Arc<MaxSlots>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
) -> Self {
info!("rpc bound to {:?}", rpc_addr);
info!("rpc configuration: {:?}", config);
Expand Down Expand Up @@ -354,6 +355,7 @@ impl JsonRpcService {
optimistically_confirmed_bank,
largest_accounts_cache,
max_slots,
leader_schedule_cache,
);

let leader_info =
Expand Down Expand Up @@ -518,6 +520,7 @@ mod tests {
1000,
1,
Arc::new(MaxSlots::default()),
Arc::new(LeaderScheduleCache::default()),
);
let thread = rpc_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-jsonrpc");
Expand Down
1 change: 1 addition & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ impl Validator {
config.send_transaction_retry_ms,
config.send_transaction_leader_forward_count,
max_slots.clone(),
leader_schedule_cache.clone(),
)),
if config.rpc_config.minimal_api {
None
Expand Down
Loading

0 comments on commit c3c4991

Please sign in to comment.