Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
refactor should_tick code
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 22, 2021
1 parent 8739fd0 commit a54782a
Showing 1 changed file with 138 additions and 94 deletions.
232 changes: 138 additions & 94 deletions core/src/poh_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! The `poh_service` module implements a service that records the passing of
//! "ticks", a measure of time in the PoH stream
use crate::poh_recorder::{PohRecorder, Record};
use solana_ledger::poh::Poh;
use solana_measure::measure::Measure;
use solana_sdk::poh_config::PohConfig;
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -24,6 +25,54 @@ pub const DEFAULT_PINNED_CPU_CORE: usize = 0;

const TARGET_SLOT_ADJUSTMENT_NS: u64 = 50_000_000;

#[derive(Debug)]
struct PohTiming {
num_ticks: u64,
num_hashes: u64,
total_sleep_us: u64,
total_lock_time_ns: u64,
total_hash_time_ns: u64,
total_tick_time_ns: u64,
last_metric: Instant,
}

impl PohTiming {
fn new() -> Self {
Self {
num_ticks: 0,
num_hashes: 0,
total_sleep_us: 0,
total_lock_time_ns: 0,
total_hash_time_ns: 0,
total_tick_time_ns: 0,
last_metric: Instant::now(),
}
}
fn report(&mut self, ticks_per_slot: u64) {
if self.last_metric.elapsed().as_millis() > 1000 {
let elapsed_us = self.last_metric.elapsed().as_micros() as u64;
let us_per_slot = (elapsed_us * ticks_per_slot) / self.num_ticks;
datapoint_info!(
"poh-service",
("ticks", self.num_ticks as i64, i64),
("hashes", self.num_hashes as i64, i64),
("elapsed_us", us_per_slot, i64),
("total_sleep_us", self.total_sleep_us, i64),
("total_tick_time_us", self.total_tick_time_ns / 1000, i64),
("total_lock_time_us", self.total_lock_time_ns / 1000, i64),
("total_hash_time_us", self.total_hash_time_ns / 1000, i64),
);
self.total_sleep_us = 0;
self.num_ticks = 0;
self.num_hashes = 0;
self.total_tick_time_ns = 0;
self.total_lock_time_ns = 0;
self.total_hash_time_ns = 0;
self.last_metric = Instant::now();
}
}
}

impl PohService {
pub fn new(
poh_recorder: Arc<Mutex<PohRecorder>>,
Expand Down Expand Up @@ -146,6 +195,80 @@ impl PohService {
}
}

fn record_or_hash(
next_record: &mut Option<Record>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
timing: &mut PohTiming,
record_receiver: &Receiver<Record>,
hashes_per_batch: u64,
poh: &Arc<Mutex<Poh>>,
) -> bool {
let mut next = None;
std::mem::swap(&mut next, next_record);
match next {
Some(mut record) => {
// received message to record
// so, record for as long as we have queued up record requests
let mut lock_time = Measure::start("lock");
let mut poh_recorder_l = poh_recorder.lock().unwrap();
lock_time.stop();
timing.total_lock_time_ns += lock_time.as_ns();
loop {
let res = poh_recorder_l.record(
record.slot,
record.mixin,
std::mem::take(&mut record.transactions),
);
let _ = record.sender.send(res); // what do we do on failure here? Ignore for now.
timing.num_hashes += 1; // note: may have also ticked inside record

let get_again = record_receiver.try_recv();
match get_again {
Ok(mut record2) => {
// we already have second request to record, so record again while we still have the mutex
std::mem::swap(&mut record2, &mut record);
}
Err(_) => {
break;
}
}
}
// PohRecorder.record would have ticked if it needed to, so should_tick will be false
}
None => {
// did not receive instructions to record, so hash until we notice we've been asked to record (or we need to tick) and then remember what to record
let mut lock_time = Measure::start("lock");
let mut poh_l = poh.lock().unwrap();
lock_time.stop();
timing.total_lock_time_ns += lock_time.as_ns();
let mut should_tick;
loop {
timing.num_hashes += hashes_per_batch;
let mut hash_time = Measure::start("hash");
should_tick = poh_l.hash(hashes_per_batch);
hash_time.stop();
timing.total_hash_time_ns += hash_time.as_ns();
if should_tick {
return true; // nothing else can be done. tick required.
}
// check to see if a record request has been sent
let get_again = record_receiver.try_recv();
match get_again {
Ok(record) => {
// remember the record we just received as the next record to occur
*next_record = Some(record);
break;
}
Err(_) => {
continue;
}
}
}
}
};
false // should_tick = false for all code that reaches here
}

fn tick_producer(
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_exit: &AtomicBool,
Expand All @@ -156,119 +279,40 @@ impl PohService {
) {
let poh = poh_recorder.lock().unwrap().poh.clone();
let mut now = Instant::now();
let mut last_metric = Instant::now();
let mut num_ticks = 0;
let mut num_hashes = 0;
let mut total_sleep_us = 0;
let mut total_lock_time_ns = 0;
let mut total_hash_time_ns = 0;
let mut total_tick_time_ns = 0;
let mut try_again_mixin = None;
let mut timing = PohTiming::new();
let mut next_record = None;
loop {
let should_tick = {
let mixin = if let Some(record) = try_again_mixin {
try_again_mixin = None;
Ok(record)
} else {
record_receiver.try_recv()
};
if let Ok(mut record) = mixin {
let mut lock_time = Measure::start("lock");
let mut poh_recorder_l = poh_recorder.lock().unwrap();
lock_time.stop();
total_lock_time_ns += lock_time.as_ns();
loop {
let res = poh_recorder_l.record(
record.slot,
record.mixin,
std::mem::take(&mut record.transactions),
);
let _ = record.sender.send(res); // what do we do on failure here? Ignore for now.
num_hashes += 1; // note: may have also ticked inside record

let get_again = record_receiver.try_recv();
match get_again {
Ok(mut record2) => {
std::mem::swap(&mut record2, &mut record);
}
Err(_) => {
break;
}
}
}
false // record will tick if it needs to
} else {
let mut lock_time = Measure::start("lock");
let mut poh_l = poh.lock().unwrap();
lock_time.stop();
total_lock_time_ns += lock_time.as_ns();
let mut r;
loop {
num_hashes += hashes_per_batch;
let mut hash_time = Measure::start("hash");
r = poh_l.hash(hashes_per_batch);
hash_time.stop();
total_hash_time_ns += hash_time.as_ns();
if r {
break;
}
let get_again = record_receiver.try_recv();
match get_again {
Ok(inside) => {
try_again_mixin = Some(inside);
break;
}
Err(_) => {
continue;
}
}
}
r
}
};
let should_tick = Self::record_or_hash(
&mut next_record,
&poh_recorder,
&mut timing,
&record_receiver,
hashes_per_batch,
&poh,
);
if should_tick {
// Lock PohRecorder only for the final hash...
{
let mut lock_time = Measure::start("lock");
let mut poh_recorder_l = poh_recorder.lock().unwrap();
lock_time.stop();
total_lock_time_ns += lock_time.as_ns();
timing.total_lock_time_ns += lock_time.as_ns();
let mut tick_time = Measure::start("tick");
poh_recorder_l.tick();
tick_time.stop();
total_tick_time_ns += tick_time.as_ns();
timing.total_tick_time_ns += tick_time.as_ns();
}
num_ticks += 1;
timing.num_ticks += 1;
let elapsed_ns = now.elapsed().as_nanos() as u64;
// sleep is not accurate enough to get a predictable time.
// Kernel can not schedule the thread for a while.
while (now.elapsed().as_nanos() as u64) < target_tick_ns {
std::hint::spin_loop();
}
total_sleep_us += (now.elapsed().as_nanos() as u64 - elapsed_ns) / 1000;
timing.total_sleep_us += (now.elapsed().as_nanos() as u64 - elapsed_ns) / 1000;
now = Instant::now();

if last_metric.elapsed().as_millis() > 1000 {
let elapsed_us = last_metric.elapsed().as_micros() as u64;
let us_per_slot = (elapsed_us * ticks_per_slot) / num_ticks;
datapoint_info!(
"poh-service",
("ticks", num_ticks as i64, i64),
("hashes", num_hashes as i64, i64),
("elapsed_us", us_per_slot, i64),
("total_sleep_us", total_sleep_us, i64),
("total_tick_time_us", total_tick_time_ns / 1000, i64),
("total_lock_time_us", total_lock_time_ns / 1000, i64),
("total_hash_time_us", total_hash_time_ns / 1000, i64),
);
total_sleep_us = 0;
num_ticks = 0;
num_hashes = 0;
total_tick_time_ns = 0;
total_lock_time_ns = 0;
total_hash_time_ns = 0;
last_metric = Instant::now();
}
timing.report(ticks_per_slot);
if poh_exit.load(Ordering::Relaxed) {
break;
}
Expand Down

0 comments on commit a54782a

Please sign in to comment.