Skip to content

Commit

Permalink
Rollup merge of rust-lang#137525 - tgross35:test-float-parse-less-par…
Browse files Browse the repository at this point in the history
…allelization, r=Mark-Simulacrum

Simplify parallelization in test-float-parse

Currently, test case generators are launched in parallel and their test cases also run in parallel, all within the same pool. I originally implemented this with the assumption that there would be an advantage in parallelizing the generators themselves, but this turns out to not really have any benefit.

Simplify things by running generators in series while keeping their test cases parallelized. This makes the code easier to follow, and there is no longer a need for MPSC or multiprogress bars. Additionally, the UI output can be made cleaner.
  • Loading branch information
tgross35 authored Mar 2, 2025
2 parents fd50842 + 7603e01 commit 0386e93
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 226 deletions.
3 changes: 1 addition & 2 deletions src/etc/test-float-parse/src/gen/exhaustive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ impl<F: Float> Generator<F> for Exhaustive<F>
where
RangeInclusive<F::Int>: Iterator<Item = F::Int>,
{
const NAME: &'static str = "exhaustive";
const SHORT_NAME: &'static str = "exhaustive";

type WriteCtx = F;

fn total_tests() -> u64 {
F::Int::MAX.try_into().unwrap_or(u64::MAX)
1u64.checked_shl(F::Int::BITS).expect("More than u64::MAX tests")
}

fn new() -> Self {
Expand Down
1 change: 0 additions & 1 deletion src/etc/test-float-parse/src/gen/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl<F: Float> Generator<F> for Fuzz<F>
where
Standard: Distribution<<F as Float>::Int>,
{
const NAME: &'static str = "fuzz";
const SHORT_NAME: &'static str = "fuzz";

type WriteCtx = F;
Expand Down
1 change: 0 additions & 1 deletion src/etc/test-float-parse/src/gen/sparse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ impl<F: Float> Generator<F> for FewOnesInt<F>
where
<F::Int as TryFrom<u128>>::Error: std::fmt::Debug,
{
const NAME: &'static str = "few ones int";
const SHORT_NAME: &'static str = "few ones int";

type WriteCtx = F::Int;
Expand Down
221 changes: 53 additions & 168 deletions src/etc/test-float-parse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ mod traits;
mod ui;
mod validate;

use std::any::{TypeId, type_name};
use std::any::type_name;
use std::cmp::min;
use std::ops::RangeInclusive;
use std::process::ExitCode;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{OnceLock, mpsc};
use std::{fmt, time};

use indicatif::{MultiProgress, ProgressBar};
use rand::distributions::{Distribution, Standard};
use rayon::prelude::*;
use time::{Duration, Instant};
use traits::{Float, Generator, Int};
use validate::CheckError;

/// Test generators.
mod gen {
Expand Down Expand Up @@ -43,7 +43,7 @@ const HUGE_TEST_CUTOFF: u64 = 5_000_000;
/// Seed for tests that use a deterministic RNG.
const SEED: [u8; 32] = *b"3.141592653589793238462643383279";

/// Global configuration
/// Global configuration.
#[derive(Debug)]
pub struct Config {
pub timeout: Duration,
Expand Down Expand Up @@ -104,9 +104,9 @@ pub fn run(cfg: Config, include: &[String], exclude: &[String]) -> ExitCode {
println!("Skipping test '{exc}'");
}

println!("launching");
println!("Launching all");
let elapsed = launch_tests(&mut tests, &cfg);
ui::finish(&tests, elapsed, &cfg)
ui::finish_all(&tests, elapsed, &cfg)
}

/// Enumerate tests to run but don't actually run them.
Expand Down Expand Up @@ -160,18 +160,18 @@ where
#[derive(Debug)]
pub struct TestInfo {
pub name: String,
/// Tests are identified by the type ID of `(F, G)` (tuple of the float and generator type).
/// This gives an easy way to associate messages with tests.
id: TypeId,
float_name: &'static str,
float_bits: u32,
gen_name: &'static str,
/// Name for display in the progress bar.
short_name: String,
/// Pad the short name to a common width for progress bar use.
short_name_padded: String,
total_tests: u64,
/// Function to launch this test.
launch: fn(&mpsc::Sender<Msg>, &TestInfo, &Config),
launch: fn(&TestInfo, &Config),
/// Progress bar to be updated.
pb: Option<ProgressBar>,
progress: Option<ui::Progress>,
/// Once completed, this will be set.
completed: OnceLock<Completed>,
}
Expand All @@ -187,121 +187,37 @@ impl TestInfo {
let f_name = type_name::<F>();
let gen_name = G::NAME;
let gen_short_name = G::SHORT_NAME;
let name = format!("{f_name} {gen_name}");
let short_name = format!("{f_name} {gen_short_name}");
let short_name_padded = format!("{short_name:18}");

let info = TestInfo {
id: TypeId::of::<(F, G)>(),
float_name: f_name,
float_bits: F::BITS,
gen_name,
pb: None,
name: format!("{f_name} {gen_name}"),
short_name: format!("{f_name} {gen_short_name}"),
progress: None,
name,
short_name_padded,
short_name,
launch: test_runner::<F, G>,
total_tests: G::total_tests(),
completed: OnceLock::new(),
};
v.push(info);
}

/// Pad the short name to a common width for progress bar use.
fn short_name_padded(&self) -> String {
format!("{:18}", self.short_name)
}

/// Create a progress bar for this test within a multiprogress bar.
fn register_pb(&mut self, mp: &MultiProgress, drop_bars: &mut Vec<ProgressBar>) {
self.pb = Some(ui::create_pb(mp, self.total_tests, &self.short_name_padded(), drop_bars));
}

/// When the test is finished, update progress bar messages and finalize.
fn finalize_pb(&self, c: &Completed) {
let pb = self.pb.as_ref().unwrap();
ui::finalize_pb(pb, &self.short_name_padded(), c);
}

/// True if this should be run after all others.
fn is_huge_test(&self) -> bool {
self.total_tests >= HUGE_TEST_CUTOFF
}
}

/// A message sent from test runner threads to the UI/log thread.
#[derive(Clone, Debug)]
struct Msg {
id: TypeId,
update: Update,
}

impl Msg {
/// Wrap an `Update` into a message for the specified type. We use the `TypeId` of `(F, G)` to
/// identify which test a message in the channel came from.
fn new<F: Float, G: Generator<F>>(u: Update) -> Self {
Self { id: TypeId::of::<(F, G)>(), update: u }
}

/// Get the matching test from a list. Panics if not found.
fn find_test<'a>(&self, tests: &'a [TestInfo]) -> &'a TestInfo {
tests.iter().find(|t| t.id == self.id).unwrap()
}

/// Update UI as needed for a single message received from the test runners.
fn handle(self, tests: &[TestInfo], mp: &MultiProgress) {
let test = self.find_test(tests);
let pb = test.pb.as_ref().unwrap();

match self.update {
Update::Started => {
mp.println(format!("Testing '{}'", test.name)).unwrap();
}
Update::Progress { executed, failures } => {
pb.set_message(format! {"{failures}"});
pb.set_position(executed);
}
Update::Failure { fail, input, float_res } => {
mp.println(format!(
"Failure in '{}': {fail}. parsing '{input}'. Parsed as: {float_res}",
test.name
))
.unwrap();
}
Update::Completed(c) => {
test.finalize_pb(&c);

let prefix = match c.result {
Ok(FinishedAll) => "Completed tests for",
Err(EarlyExit::Timeout) => "Timed out",
Err(EarlyExit::MaxFailures) => "Max failures reached for",
};

mp.println(format!(
"{prefix} generator '{}' in {:?}. {} tests run, {} failures",
test.name, c.elapsed, c.executed, c.failures
))
.unwrap();
test.completed.set(c).unwrap();
}
};
/// When the test is finished, update progress bar messages and finalize.
fn complete(&self, c: Completed) {
self.progress.as_ref().unwrap().complete(&c, 0);
self.completed.set(c).unwrap();
}
}

/// Status sent with a message.
#[derive(Clone, Debug)]
enum Update {
/// Starting a new test runner.
Started,
/// Completed a out of b tests.
Progress { executed: u64, failures: u64 },
/// Received a failed test.
Failure {
fail: CheckFailure,
/// String for which parsing was attempted.
input: Box<str>,
/// The parsed & decomposed `FloatRes`, already stringified so we don't need generics here.
float_res: Box<str>,
},
/// Exited with an unexpected condition.
Completed(Completed),
}

/// Result of an input did not parsing successfully.
#[derive(Clone, Debug)]
enum CheckFailure {
Expand Down Expand Up @@ -329,6 +245,10 @@ enum CheckFailure {
/// two representable values.
incorrect_midpoint_rounding: bool,
},
/// String did not parse successfully.
ParsingFailed(Box<str>),
/// A panic was caught.
Panic(Box<str>),
}

impl fmt::Display for CheckFailure {
Expand Down Expand Up @@ -363,6 +283,8 @@ impl fmt::Display for CheckFailure {
}
Ok(())
}
CheckFailure::ParsingFailed(e) => write!(f, "parsing failed: {e}"),
CheckFailure::Panic(e) => write!(f, "function panicked: {e}"),
}
}
}
Expand Down Expand Up @@ -398,71 +320,34 @@ enum EarlyExit {
/// This launches a main thread that receives messages and handlees UI updates, and uses the
/// rest of the thread pool to execute the tests.
fn launch_tests(tests: &mut [TestInfo], cfg: &Config) -> Duration {
// Run shorter tests first
tests.sort_unstable_by_key(|test| test.total_tests);
// Run shorter tests and smaller float types first.
tests.sort_unstable_by_key(|test| (test.total_tests, test.float_bits));

for test in tests.iter() {
println!("Launching test '{}'", test.name);
}

// Configure progress bars
let mut all_progress_bars = Vec::new();
let mp = MultiProgress::new();
mp.set_move_cursor(true);
for test in tests.iter_mut() {
test.register_pb(&mp, &mut all_progress_bars);
}

ui::set_panic_hook(all_progress_bars);

let (tx, rx) = mpsc::channel::<Msg>();
let start = Instant::now();

rayon::scope(|scope| {
// Thread that updates the UI
scope.spawn(|_scope| {
let rx = rx; // move rx

loop {
if tests.iter().all(|t| t.completed.get().is_some()) {
break;
}

let msg = rx.recv().unwrap();
msg.handle(tests, &mp);
}

// All tests completed; finish things up
drop(mp);
assert_eq!(rx.try_recv().unwrap_err(), mpsc::TryRecvError::Empty);
});

// Don't let the thread pool be starved by huge tests. Run faster tests first in parallel,
// then parallelize only within the rest of the tests.
let (huge_tests, normal_tests): (Vec<_>, Vec<_>) =
tests.iter().partition(|t| t.is_huge_test());

// Run the actual tests
normal_tests.par_iter().for_each(|test| ((test.launch)(&tx, test, cfg)));

huge_tests.par_iter().for_each(|test| ((test.launch)(&tx, test, cfg)));
});
for test in tests.iter_mut() {
test.progress = Some(ui::Progress::new(test, &mut all_progress_bars));
ui::set_panic_hook(&all_progress_bars);
((test.launch)(test, cfg));
}

start.elapsed()
}

/// Test runer for a single generator.
///
/// This calls the generator's iterator multiple times (in parallel) and validates each output.
fn test_runner<F: Float, G: Generator<F>>(tx: &mpsc::Sender<Msg>, _info: &TestInfo, cfg: &Config) {
tx.send(Msg::new::<F, G>(Update::Started)).unwrap();

let total = G::total_tests();
fn test_runner<F: Float, G: Generator<F>>(test: &TestInfo, cfg: &Config) {
let gen = G::new();
let executed = AtomicU64::new(0);
let failures = AtomicU64::new(0);

let checks_per_update = min(total, 1000);
let checks_per_update = min(test.total_tests, 1000);
let started = Instant::now();

// Function to execute for a single test iteration.
Expand All @@ -474,7 +359,12 @@ fn test_runner<F: Float, G: Generator<F>>(tx: &mpsc::Sender<Msg>, _info: &TestIn
match validate::validate::<F>(buf) {
Ok(()) => (),
Err(e) => {
tx.send(Msg::new::<F, G>(e)).unwrap();
let CheckError { fail, input, float_res } = e;
test.progress.as_ref().unwrap().println(&format!(
"Failure in '{}': {fail}. parsing '{input}'. Parsed as: {float_res}",
test.name
));

let f = failures.fetch_add(1, Ordering::Relaxed);
// End early if the limit is exceeded.
if f >= cfg.max_failures {
Expand All @@ -486,9 +376,7 @@ fn test_runner<F: Float, G: Generator<F>>(tx: &mpsc::Sender<Msg>, _info: &TestIn
// Send periodic updates
if executed % checks_per_update == 0 {
let failures = failures.load(Ordering::Relaxed);

tx.send(Msg::new::<F, G>(Update::Progress { executed, failures })).unwrap();

test.progress.as_ref().unwrap().update(executed, failures);
if started.elapsed() > cfg.timeout {
return Err(EarlyExit::Timeout);
}
Expand All @@ -499,28 +387,25 @@ fn test_runner<F: Float, G: Generator<F>>(tx: &mpsc::Sender<Msg>, _info: &TestIn

// Run the test iterations in parallel. Each thread gets a string buffer to write
// its check values to.
let res = gen.par_bridge().try_for_each_init(|| String::with_capacity(100), check_one);
let res = gen.par_bridge().try_for_each_init(String::new, check_one);

let elapsed = started.elapsed();
let executed = executed.into_inner();
let failures = failures.into_inner();

// Warn about bad estimates if relevant.
let warning = if executed != total && res.is_ok() {
let msg = format!("executed tests != estimated ({executed} != {total}) for {}", G::NAME);
let warning = if executed != test.total_tests && res.is_ok() {
let msg = format!(
"executed tests != estimated ({executed} != {}) for {}",
test.total_tests,
G::NAME
);

Some(msg.into())
} else {
None
};

let result = res.map(|()| FinishedAll);
tx.send(Msg::new::<F, G>(Update::Completed(Completed {
executed,
failures,
result,
warning,
elapsed,
})))
.unwrap();
test.complete(Completed { executed, failures, result, warning, elapsed });
}
Loading

0 comments on commit 0386e93

Please sign in to comment.