Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify parallelization in test-float-parse #137525

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/etc/test-float-parse/src/gen/exhaustive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ impl<F: Float> Generator<F> for Exhaustive<F>
where
RangeInclusive<F::Int>: Iterator<Item = F::Int>,
{
const NAME: &'static str = "exhaustive";
const SHORT_NAME: &'static str = "exhaustive";

type WriteCtx = F;

fn total_tests() -> u64 {
F::Int::MAX.try_into().unwrap_or(u64::MAX)
1u64.checked_shl(F::Int::BITS).expect("More than u64::MAX tests")
}

fn new() -> Self {
Expand Down
1 change: 0 additions & 1 deletion src/etc/test-float-parse/src/gen/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl<F: Float> Generator<F> for Fuzz<F>
where
Standard: Distribution<<F as Float>::Int>,
{
const NAME: &'static str = "fuzz";
const SHORT_NAME: &'static str = "fuzz";

type WriteCtx = F;
Expand Down
1 change: 0 additions & 1 deletion src/etc/test-float-parse/src/gen/sparse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ impl<F: Float> Generator<F> for FewOnesInt<F>
where
<F::Int as TryFrom<u128>>::Error: std::fmt::Debug,
{
const NAME: &'static str = "few ones int";
const SHORT_NAME: &'static str = "few ones int";

type WriteCtx = F::Int;
Expand Down
221 changes: 53 additions & 168 deletions src/etc/test-float-parse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ mod traits;
mod ui;
mod validate;

use std::any::{TypeId, type_name};
use std::any::type_name;
use std::cmp::min;
use std::ops::RangeInclusive;
use std::process::ExitCode;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{OnceLock, mpsc};
use std::{fmt, time};

use indicatif::{MultiProgress, ProgressBar};
use rand::distributions::{Distribution, Standard};
use rayon::prelude::*;
use time::{Duration, Instant};
use traits::{Float, Generator, Int};
use validate::CheckError;

/// Test generators.
mod gen {
Expand Down Expand Up @@ -43,7 +43,7 @@ const HUGE_TEST_CUTOFF: u64 = 5_000_000;
/// Seed for tests that use a deterministic RNG.
const SEED: [u8; 32] = *b"3.141592653589793238462643383279";

/// Global configuration
/// Global configuration.
#[derive(Debug)]
pub struct Config {
pub timeout: Duration,
Expand Down Expand Up @@ -104,9 +104,9 @@ pub fn run(cfg: Config, include: &[String], exclude: &[String]) -> ExitCode {
println!("Skipping test '{exc}'");
}

println!("launching");
println!("Launching all");
let elapsed = launch_tests(&mut tests, &cfg);
ui::finish(&tests, elapsed, &cfg)
ui::finish_all(&tests, elapsed, &cfg)
}

/// Enumerate tests to run but don't actually run them.
Expand Down Expand Up @@ -160,18 +160,18 @@ where
#[derive(Debug)]
pub struct TestInfo {
pub name: String,
/// Tests are identified by the type ID of `(F, G)` (tuple of the float and generator type).
/// This gives an easy way to associate messages with tests.
id: TypeId,
float_name: &'static str,
float_bits: u32,
gen_name: &'static str,
/// Name for display in the progress bar.
short_name: String,
/// Pad the short name to a common width for progress bar use.
short_name_padded: String,
total_tests: u64,
/// Function to launch this test.
launch: fn(&mpsc::Sender<Msg>, &TestInfo, &Config),
launch: fn(&TestInfo, &Config),
/// Progress bar to be updated.
pb: Option<ProgressBar>,
progress: Option<ui::Progress>,
/// Once completed, this will be set.
completed: OnceLock<Completed>,
}
Expand All @@ -187,121 +187,37 @@ impl TestInfo {
let f_name = type_name::<F>();
let gen_name = G::NAME;
let gen_short_name = G::SHORT_NAME;
let name = format!("{f_name} {gen_name}");
let short_name = format!("{f_name} {gen_short_name}");
let short_name_padded = format!("{short_name:18}");

let info = TestInfo {
id: TypeId::of::<(F, G)>(),
float_name: f_name,
float_bits: F::BITS,
gen_name,
pb: None,
name: format!("{f_name} {gen_name}"),
short_name: format!("{f_name} {gen_short_name}"),
progress: None,
name,
short_name_padded,
short_name,
launch: test_runner::<F, G>,
total_tests: G::total_tests(),
completed: OnceLock::new(),
};
v.push(info);
}

/// Pad the short name to a common width for progress bar use.
fn short_name_padded(&self) -> String {
format!("{:18}", self.short_name)
}

/// Create a progress bar for this test within a multiprogress bar.
fn register_pb(&mut self, mp: &MultiProgress, drop_bars: &mut Vec<ProgressBar>) {
self.pb = Some(ui::create_pb(mp, self.total_tests, &self.short_name_padded(), drop_bars));
}

/// When the test is finished, update progress bar messages and finalize.
fn finalize_pb(&self, c: &Completed) {
let pb = self.pb.as_ref().unwrap();
ui::finalize_pb(pb, &self.short_name_padded(), c);
}

/// True if this should be run after all others.
fn is_huge_test(&self) -> bool {
self.total_tests >= HUGE_TEST_CUTOFF
}
}

/// A message sent from test runner threads to the UI/log thread.
#[derive(Clone, Debug)]
struct Msg {
id: TypeId,
update: Update,
}

impl Msg {
/// Wrap an `Update` into a message for the specified type. We use the `TypeId` of `(F, G)` to
/// identify which test a message in the channel came from.
fn new<F: Float, G: Generator<F>>(u: Update) -> Self {
Self { id: TypeId::of::<(F, G)>(), update: u }
}

/// Get the matching test from a list. Panics if not found.
fn find_test<'a>(&self, tests: &'a [TestInfo]) -> &'a TestInfo {
tests.iter().find(|t| t.id == self.id).unwrap()
}

/// Update UI as needed for a single message received from the test runners.
fn handle(self, tests: &[TestInfo], mp: &MultiProgress) {
let test = self.find_test(tests);
let pb = test.pb.as_ref().unwrap();

match self.update {
Update::Started => {
mp.println(format!("Testing '{}'", test.name)).unwrap();
}
Update::Progress { executed, failures } => {
pb.set_message(format! {"{failures}"});
pb.set_position(executed);
}
Update::Failure { fail, input, float_res } => {
mp.println(format!(
"Failure in '{}': {fail}. parsing '{input}'. Parsed as: {float_res}",
test.name
))
.unwrap();
}
Update::Completed(c) => {
test.finalize_pb(&c);

let prefix = match c.result {
Ok(FinishedAll) => "Completed tests for",
Err(EarlyExit::Timeout) => "Timed out",
Err(EarlyExit::MaxFailures) => "Max failures reached for",
};

mp.println(format!(
"{prefix} generator '{}' in {:?}. {} tests run, {} failures",
test.name, c.elapsed, c.executed, c.failures
))
.unwrap();
test.completed.set(c).unwrap();
}
};
/// When the test is finished, update progress bar messages and finalize.
fn complete(&self, c: Completed) {
self.progress.as_ref().unwrap().complete(&c, 0);
self.completed.set(c).unwrap();
}
}

/// Status sent with a message.
#[derive(Clone, Debug)]
enum Update {
/// Starting a new test runner.
Started,
/// Completed a out of b tests.
Progress { executed: u64, failures: u64 },
/// Received a failed test.
Failure {
fail: CheckFailure,
/// String for which parsing was attempted.
input: Box<str>,
/// The parsed & decomposed `FloatRes`, already stringified so we don't need generics here.
float_res: Box<str>,
},
/// Exited with an unexpected condition.
Completed(Completed),
}

/// Result of an input did not parsing successfully.
#[derive(Clone, Debug)]
enum CheckFailure {
Expand Down Expand Up @@ -329,6 +245,10 @@ enum CheckFailure {
/// two representable values.
incorrect_midpoint_rounding: bool,
},
/// String did not parse successfully.
ParsingFailed(Box<str>),
/// A panic was caught.
Panic(Box<str>),
}

impl fmt::Display for CheckFailure {
Expand Down Expand Up @@ -363,6 +283,8 @@ impl fmt::Display for CheckFailure {
}
Ok(())
}
CheckFailure::ParsingFailed(e) => write!(f, "parsing failed: {e}"),
CheckFailure::Panic(e) => write!(f, "function panicked: {e}"),
}
}
}
Expand Down Expand Up @@ -398,71 +320,34 @@ enum EarlyExit {
/// This launches a main thread that receives messages and handlees UI updates, and uses the
/// rest of the thread pool to execute the tests.
fn launch_tests(tests: &mut [TestInfo], cfg: &Config) -> Duration {
// Run shorter tests first
tests.sort_unstable_by_key(|test| test.total_tests);
// Run shorter tests and smaller float types first.
tests.sort_unstable_by_key(|test| (test.total_tests, test.float_bits));

for test in tests.iter() {
println!("Launching test '{}'", test.name);
}

// Configure progress bars
let mut all_progress_bars = Vec::new();
let mp = MultiProgress::new();
mp.set_move_cursor(true);
for test in tests.iter_mut() {
test.register_pb(&mp, &mut all_progress_bars);
}

ui::set_panic_hook(all_progress_bars);

let (tx, rx) = mpsc::channel::<Msg>();
let start = Instant::now();

rayon::scope(|scope| {
// Thread that updates the UI
scope.spawn(|_scope| {
let rx = rx; // move rx

loop {
if tests.iter().all(|t| t.completed.get().is_some()) {
break;
}

let msg = rx.recv().unwrap();
msg.handle(tests, &mp);
}

// All tests completed; finish things up
drop(mp);
assert_eq!(rx.try_recv().unwrap_err(), mpsc::TryRecvError::Empty);
});

// Don't let the thread pool be starved by huge tests. Run faster tests first in parallel,
// then parallelize only within the rest of the tests.
let (huge_tests, normal_tests): (Vec<_>, Vec<_>) =
tests.iter().partition(|t| t.is_huge_test());

// Run the actual tests
normal_tests.par_iter().for_each(|test| ((test.launch)(&tx, test, cfg)));

huge_tests.par_iter().for_each(|test| ((test.launch)(&tx, test, cfg)));
});
for test in tests.iter_mut() {
test.progress = Some(ui::Progress::new(test, &mut all_progress_bars));
ui::set_panic_hook(&all_progress_bars);
((test.launch)(test, cfg));
}

start.elapsed()
}

/// Test runer for a single generator.
///
/// This calls the generator's iterator multiple times (in parallel) and validates each output.
fn test_runner<F: Float, G: Generator<F>>(tx: &mpsc::Sender<Msg>, _info: &TestInfo, cfg: &Config) {
tx.send(Msg::new::<F, G>(Update::Started)).unwrap();

let total = G::total_tests();
fn test_runner<F: Float, G: Generator<F>>(test: &TestInfo, cfg: &Config) {
let gen = G::new();
let executed = AtomicU64::new(0);
let failures = AtomicU64::new(0);

let checks_per_update = min(total, 1000);
let checks_per_update = min(test.total_tests, 1000);
let started = Instant::now();

// Function to execute for a single test iteration.
Expand All @@ -474,7 +359,12 @@ fn test_runner<F: Float, G: Generator<F>>(tx: &mpsc::Sender<Msg>, _info: &TestIn
match validate::validate::<F>(buf) {
Ok(()) => (),
Err(e) => {
tx.send(Msg::new::<F, G>(e)).unwrap();
let CheckError { fail, input, float_res } = e;
test.progress.as_ref().unwrap().println(&format!(
"Failure in '{}': {fail}. parsing '{input}'. Parsed as: {float_res}",
test.name
));

let f = failures.fetch_add(1, Ordering::Relaxed);
// End early if the limit is exceeded.
if f >= cfg.max_failures {
Expand All @@ -486,9 +376,7 @@ fn test_runner<F: Float, G: Generator<F>>(tx: &mpsc::Sender<Msg>, _info: &TestIn
// Send periodic updates
if executed % checks_per_update == 0 {
let failures = failures.load(Ordering::Relaxed);

tx.send(Msg::new::<F, G>(Update::Progress { executed, failures })).unwrap();

test.progress.as_ref().unwrap().update(executed, failures);
if started.elapsed() > cfg.timeout {
return Err(EarlyExit::Timeout);
}
Expand All @@ -499,28 +387,25 @@ fn test_runner<F: Float, G: Generator<F>>(tx: &mpsc::Sender<Msg>, _info: &TestIn

// Run the test iterations in parallel. Each thread gets a string buffer to write
// its check values to.
let res = gen.par_bridge().try_for_each_init(|| String::with_capacity(100), check_one);
let res = gen.par_bridge().try_for_each_init(String::new, check_one);

let elapsed = started.elapsed();
let executed = executed.into_inner();
let failures = failures.into_inner();

// Warn about bad estimates if relevant.
let warning = if executed != total && res.is_ok() {
let msg = format!("executed tests != estimated ({executed} != {total}) for {}", G::NAME);
let warning = if executed != test.total_tests && res.is_ok() {
let msg = format!(
"executed tests != estimated ({executed} != {}) for {}",
test.total_tests,
G::NAME
);

Some(msg.into())
} else {
None
};

let result = res.map(|()| FinishedAll);
tx.send(Msg::new::<F, G>(Update::Completed(Completed {
executed,
failures,
result,
warning,
elapsed,
})))
.unwrap();
test.complete(Completed { executed, failures, result, warning, elapsed });
}
Loading
Loading