Skip to content

Commit

Permalink
Improve UX with a new expensive_op function.
Browse files Browse the repository at this point in the history
As stated in the docs: Perform this expensive operation as a normal Rust
function, but if it takes more than a certain amount of time, display a message
to the user that you're still waiting for this operation to complete.

This function is more or less the same as one I wrote for raw-data reading ages
ago, but now I've made it generic to be used for other routines.
  • Loading branch information
cjordan committed Sep 26, 2023
1 parent 9ceaea6 commit 4b5f742
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 170 deletions.
5 changes: 4 additions & 1 deletion src/cli/di_calibrate/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,10 @@ impl DiCalParams {
// kinds.
let sl_type_specified = source_list_type.is_none();
let sl_type = source_list_type.and_then(|t| SourceListType::from_str(t.as_ref()).ok());
let (sl, sl_type) = match read_source_list_file(sl_pb, sl_type) {
let (sl, sl_type) = match crate::misc::expensive_op(
|| read_source_list_file(sl_pb, sl_type),
"Still reading source list file",
) {
Ok((sl, sl_type)) => (sl, sl_type),
Err(e) => return Err(DiCalArgsError::from(e)),
};
Expand Down
5 changes: 4 additions & 1 deletion src/cli/srclist/by_beam/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ fn by_beam<P: AsRef<Path>, S: AsRef<str>>(
) -> Result<(), SrclistError> {
// Read the input source list.
let input_type = input_type.and_then(|t| SourceListType::from_str(t).ok());
let (sl, sl_type) = read_source_list_file(input_path, input_type)?;
let (sl, sl_type) = crate::misc::expensive_op(
|| read_source_list_file(input_path, input_type),
"Still reading source list file",
)?;
if input_type.is_none() {
info!(
"Successfully read {} as a {}-style source list",
Expand Down
5 changes: 4 additions & 1 deletion src/cli/srclist/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ fn convert<P: AsRef<Path>, S: AsRef<str>>(
}

// Read the input source list.
let (sl, sl_type) = read_source_list_file(input_path, input_type)?;
let (sl, sl_type) = crate::misc::expensive_op(
|| read_source_list_file(input_path, input_type),
"Still reading source list file",
)?;
if input_type.is_none() {
info!(
"Successfully read {} as a {}-style source list",
Expand Down
5 changes: 4 additions & 1 deletion src/cli/srclist/shift.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ fn shift<P: AsRef<Path>, S: AsRef<str>>(
let f = BufReader::new(File::open(source_shifts_file)?);
let source_shifts: BTreeMap<String, RaDec> =
serde_json::from_reader(f).map_err(WriteSourceListError::from)?;
let (sl, sl_type) = read_source_list_file(source_list_file, input_type)?;
let (sl, sl_type) = crate::misc::expensive_op(
|| read_source_list_file(source_list_file, input_type),
"Still reading source list file",
)?;
info!(
"Successfully read {} as a {}-style source list",
source_list_file.display(),
Expand Down
26 changes: 21 additions & 5 deletions src/cli/srclist/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,22 @@ fn verify<P: AsRef<Path>>(
let (sl, sl_type) = if let Some(input_type) = input_type {
let mut buf = std::io::BufReader::new(File::open(source_list)?);
let result = match input_type {
SourceListType::Hyperdrive => hyperdrive::source_list_from_yaml(&mut buf),
SourceListType::AO => ao::parse_source_list(&mut buf),
SourceListType::Rts => rts::parse_source_list(&mut buf),
SourceListType::Woden => woden::parse_source_list(&mut buf),
SourceListType::Hyperdrive => crate::misc::expensive_op(
|| hyperdrive::source_list_from_yaml(&mut buf),
"Still reading source list file",
),
SourceListType::AO => crate::misc::expensive_op(
|| ao::parse_source_list(&mut buf),
"Still reading source list file",
),
SourceListType::Rts => crate::misc::expensive_op(
|| rts::parse_source_list(&mut buf),
"Still reading source list file",
),
SourceListType::Woden => crate::misc::expensive_op(
|| woden::parse_source_list(&mut buf),
"Still reading source list file",
),
};
match result {
Ok(sl) => (sl, input_type),
Expand All @@ -81,7 +93,11 @@ fn verify<P: AsRef<Path>>(
}
}
} else {
match read_source_list_file(source_list, None) {
let source_list = source_list.as_ref();
match crate::misc::expensive_op(
|| read_source_list_file(source_list, None),
"Still reading source list file",
) {
Ok(sl) => sl,
Err(e) => {
info!("{}", e);
Expand Down
5 changes: 4 additions & 1 deletion src/cli/vis_utils/simulate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,10 @@ impl VisSimParams {
};
// Read the source list.
// TODO: Allow the user to specify a source list type.
let source_list = match read_source_list_file(sl_pb, None) {
let source_list: SourceList = match crate::misc::expensive_op(
|| read_source_list_file(sl_pb, None),
"Still reading source list file",
) {
Ok((sl, sl_type)) => {
debug!("Successfully parsed {}-style source list", sl_type);
sl
Expand Down
5 changes: 4 additions & 1 deletion src/cli/vis_utils/subtract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,10 @@ fn vis_subtract(args: VisSubtractArgs, dry_run: bool) -> Result<(), VisSubtractE
let sl_type = source_list_type
.as_ref()
.and_then(|t| SourceListType::from_str(t.as_ref()).ok());
let (sl, _) = match read_source_list_file(pb, sl_type) {
let (sl, _) = match crate::misc::expensive_op(
|| read_source_list_file(pb, sl_type),
"Still reading source list file",
) {
Ok((sl, sl_type)) => (sl, sl_type),
Err(e) => return Err(VisSubtractError::from(e)),
};
Expand Down
23 changes: 20 additions & 3 deletions src/di_calibrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::{
context::Polarisations,
io::write::{write_vis, VisTimestep},
math::average_epoch,
misc::expensive_op,
model::{self, ModellerInfo},
solutions::CalibrationSolutions,
};
Expand Down Expand Up @@ -115,9 +116,25 @@ pub(crate) fn get_cal_vis(
}

debug!("Allocating memory for input data visibilities and model visibilities");
let mut vis_data_tfb: Array3<Jones<f32>> = fallible_allocator!(Jones::default())?;
let mut vis_model_tfb: Array3<Jones<f32>> = fallible_allocator!(Jones::default())?;
let mut vis_weights_tfb: Array3<f32> = fallible_allocator!(0.0)?;
let CalVis {
mut vis_data_tfb,
mut vis_model_tfb,
mut vis_weights_tfb,
pols: _,
} = expensive_op(
|| -> Result<_, DiCalibrateError> {
let vis_data_tfb: Array3<Jones<f32>> = fallible_allocator!(Jones::default())?;
let vis_model_tfb: Array3<Jones<f32>> = fallible_allocator!(Jones::default())?;
let vis_weights_tfb: Array3<f32> = fallible_allocator!(0.0)?;
Ok(CalVis {
vis_data_tfb,
vis_weights_tfb,
vis_model_tfb,
pols: Polarisations::default(),
})
},
"Still waiting to allocate visibility memory",
)?;

// Sky-modelling communication channel. Used to tell the model writer when
// visibilities have been generated and they're ready to be written.
Expand Down
167 changes: 89 additions & 78 deletions src/io/read/ms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,51 +351,57 @@ impl MsReader {
// ("unavailable tiles"). Iterate over the baselines (i.e. main table
// rows) until we've seen all available antennas.
let mut autocorrelations_present = false;
let (tile_map, unavailable_tiles): (HashMap<i32, usize>, Vec<usize>) = {
let antenna1: Vec<i32> = main_table.get_col_as_vec("ANTENNA1")?;
let antenna2: Vec<i32> = main_table.get_col_as_vec("ANTENNA2")?;

let mut present_tiles = HashSet::with_capacity(total_num_tiles);
for (&antenna1, &antenna2) in antenna1.iter().zip(antenna2.iter()) {
present_tiles.insert(antenna1);
present_tiles.insert(antenna2);

if !autocorrelations_present && antenna1 == antenna2 {
autocorrelations_present = true;
}
}
let (tile_map, unavailable_tiles): (HashMap<i32, usize>, Vec<usize>) =
crate::misc::expensive_op(
|| {
let mut main_table = read_table(&ms, None)?;
let antenna1: Vec<i32> = main_table.get_col_as_vec("ANTENNA1")?;
let antenna2: Vec<i32> = main_table.get_col_as_vec("ANTENNA2")?;

let mut present_tiles = HashSet::with_capacity(total_num_tiles);
for (&antenna1, &antenna2) in antenna1.iter().zip(antenna2.iter()) {
present_tiles.insert(antenna1);
present_tiles.insert(antenna2);

if !autocorrelations_present && antenna1 == antenna2 {
autocorrelations_present = true;
}
}

// Ensure there aren't more tiles here than in the names or XYZs
// (names and XYZs are checked to be the same above).
if present_tiles.len() > tile_xyzs.len() {
return Err(MsReadError::MismatchNumMainTableNumXyzs {
main: present_tiles.len(),
xyzs: tile_xyzs.len(),
});
}
// Ensure there aren't more tiles here than in the names or XYZs
// (names and XYZs are checked to be the same above).
if present_tiles.len() > tile_xyzs.len() {
return Err(MsReadError::MismatchNumMainTableNumXyzs {
main: present_tiles.len(),
xyzs: tile_xyzs.len(),
});
}

// Ensure all MS antenna indices are positive and none are bigger
// than the number of XYZs.
for &i in &present_tiles {
if i < 0 {
return Err(MsReadError::AntennaNumNegative(i));
}
if i as usize >= tile_xyzs.len() {
return Err(MsReadError::AntennaNumTooBig(i));
}
}
// Ensure all MS antenna indices are positive and none are bigger
// than the number of XYZs.
for &i in &present_tiles {
if i < 0 {
return Err(MsReadError::AntennaNumNegative(i));
}
if i as usize >= tile_xyzs.len() {
return Err(MsReadError::AntennaNumTooBig(i));
}
}

let mut tile_map = HashMap::with_capacity(present_tiles.len());
let mut unavailable_tiles = Vec::with_capacity(total_num_tiles - present_tiles.len());
for i_tile in 0..total_num_tiles {
if let Some(v) = present_tiles.get(&(i_tile as i32)) {
tile_map.insert(*v, i_tile);
} else {
unavailable_tiles.push(i_tile);
}
}
(tile_map, unavailable_tiles)
};
let mut tile_map = HashMap::with_capacity(present_tiles.len());
let mut unavailable_tiles =
Vec::with_capacity(total_num_tiles - present_tiles.len());
for i_tile in 0..total_num_tiles {
if let Some(v) = present_tiles.get(&(i_tile as i32)) {
tile_map.insert(*v, i_tile);
} else {
unavailable_tiles.push(i_tile);
}
}
Ok::<_, MsReadError>((tile_map, unavailable_tiles))
},
"Still waiting to determine MS antenna metadata",
)?;
debug!("Autocorrelations present: {autocorrelations_present}");
debug!("Unavailable tiles: {unavailable_tiles:?}");

Expand All @@ -418,47 +424,52 @@ impl MsReader {
// all flagged, and (by default) we are not interested in using any of
// those data. We work out the first and last good timesteps by
// inspecting the flags at each timestep.
let unflagged_timesteps: Vec<usize> = {
// The first and last good timestep indices.
let mut first: Option<usize> = None;
let mut last: Option<usize> = None;

trace!("Searching for unflagged timesteps in the MS");
for i_step in 0..(main_table.n_rows() as usize) / step {
trace!("Reading timestep {i_step}");
let mut all_rows_for_step_flagged = true;
for i_row in 0..step {
let vis_flags: Vec<bool> =
main_table.get_cell_as_vec("FLAG", (i_step * step + i_row) as u64)?;
let all_flagged = vis_flags.into_iter().all(|f| f);
if !all_flagged {
all_rows_for_step_flagged = false;
if first.is_none() {
first = Some(i_step);
debug!("First good timestep: {i_step}");
let unflagged_timesteps: Vec<usize> = crate::misc::expensive_op(
|| {
// The first and last good timestep indices.
let mut first: Option<usize> = None;
let mut last: Option<usize> = None;

trace!("Searching for unflagged timesteps in the MS");
let mut main_table = read_table(&ms, None)?;
for i_step in 0..(main_table.n_rows() as usize) / step {
trace!("Reading timestep {i_step}");
let mut all_rows_for_step_flagged = true;
for i_row in 0..step {
let vis_flags: Vec<bool> =
main_table.get_cell_as_vec("FLAG", (i_step * step + i_row) as u64)?;
let all_flagged = vis_flags.into_iter().all(|f| f);
if !all_flagged {
all_rows_for_step_flagged = false;
if first.is_none() {
first = Some(i_step);
debug!("First good timestep: {i_step}");
}
break;
}
}
if all_rows_for_step_flagged && first.is_some() {
last = Some(i_step);
debug!("Last good timestep: {}", i_step - 1);
break;
}
}
if all_rows_for_step_flagged && first.is_some() {
last = Some(i_step);
debug!("Last good timestep: {}", i_step - 1);
break;
}
}

// Did the indices get set correctly?
match (first, last) {
(Some(f), Some(l)) => f..l,
// If there weren't any flags at the end of the MS, then the
// last timestep is fine.
(Some(f), None) => f..main_table.n_rows() as usize / step,
// All timesteps are flagged. The user can still use the MS, but
// they must specify some amount of flagged timesteps.
_ => 0..0,
}
}
.collect();
// Did the indices get set correctly?
let timesteps = match (first, last) {
(Some(f), Some(l)) => f..l,
// If there weren't any flags at the end of the MS, then the
// last timestep is fine.
(Some(f), None) => f..main_table.n_rows() as usize / step,
// All timesteps are flagged. The user can still use the MS, but
// they must specify some amount of flagged timesteps.
_ => 0..0,
}
.collect();
Ok::<_, TableError>(timesteps)
},
"Still waiting to determine MS timesteps",
)?;

// Neither Birli nor cotter utilise the "FLAG_ROW" column of the antenna
// table. This is the best (only?) way to unambiguously identify flagged
Expand Down
Loading

0 comments on commit 4b5f742

Please sign in to comment.