Skip to content

Commit

Permalink
✅ measurement set output for calibrate vis
Browse files Browse the repository at this point in the history
  • Loading branch information
Dev McElhinney committed Apr 5, 2022
1 parent 646bc5e commit 29caf9f
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 77 deletions.
163 changes: 111 additions & 52 deletions src/calibrate/di/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@ pub use code::calibrate_timeblocks;
use code::*;

use hifitime::{Duration, Unit};
use itertools::{izip, Itertools};
use itertools::izip;
use log::{debug, info, log_enabled, trace, Level::Debug};
use marlu::{
math::cross_correlation_baseline_to_tiles, Jones, UvfitsWriter, VisContext, VisWritable,
math::cross_correlation_baseline_to_tiles, Jones, MeasurementSetWriter,
ObsContext as MarluObsContext, UvfitsWriter, VisContext, VisWritable,
};
use ndarray::prelude::*;
use rayon::prelude::*;

use super::{params::CalibrateParams, solutions::CalibrationSolutions, CalibrateError};
use crate::data_formats::VisOutputType;
use mwa_hyperdrive_common::{hifitime, itertools, log, marlu, ndarray, rayon};
use mwa_hyperdrive_common::{
hifitime::{self, Epoch},
itertools, log, marlu, ndarray,
num_traits::Zero,
rayon,
};

/// Do all the steps required for direction-independent calibration; read the
/// input data, generate a model against it, and write the solutions out.
Expand Down Expand Up @@ -144,21 +150,23 @@ pub(crate) fn di_calibrate(

// TODO(dev): support and test autos
if params.using_autos {
panic!("not supperted yet... or are they?");
panic!("writing auto outputs for calibrated vis not supported");
}

// TODO(dev): support and test time averaging for calibrated vis
if params.output_vis_time_average_factor > 1 {
panic!("time averaging for calibrated vis not supported");
}

let ant_pairs: Vec<(usize, usize)> = params.get_ant_pairs();
let int_time: Duration = Duration::from_f64(obs_context.time_res.unwrap(), Unit::Second);

// TODO(dev): support sparse timesteps by chunking over time
for (&past, &future) in params.timesteps.iter().tuple_windows() {
assert!(future > past);
assert!(future - past == 1, "assuming contiguous timesteps");
}
let start_timestamp = obs_context.timestamps[params.timesteps[0]];

// XXX(Dev): VisContext does not support sparse timesteps, but in this case it doesn't matter
let vis_ctx = VisContext {
num_sel_timesteps: params.timesteps.len(),
start_timestamp: obs_context.timestamps[params.timesteps[0]],
start_timestamp,
int_time,
num_sel_chans: obs_context.fine_chan_freqs.len(),
start_freq_hz: obs_context.fine_chan_freqs[0] as f64,
Expand All @@ -169,13 +177,10 @@ pub(crate) fn di_calibrate(
num_vis_pols: 4,
};

// pad and transpose the data
// TODO(dev): unify unpacking
let obs_name = obs_context.obsid.map(|o| format!("MWA obsid {}", o));

// out data is [time, freq, baseline], in data is [time, baseline, freq]
// shape of entire output [time, freq, baseline]. in data is [time, baseline, freq]
let out_shape = vis_ctx.sel_dims();
let mut out_data = Array3::zeros(out_shape);
let mut out_weights = Array3::from_elem(out_shape, -0.0);

assert_eq!(vis_weights.dim(), vis_data.dim());
// time
Expand All @@ -188,23 +193,90 @@ pub(crate) fn di_calibrate(
out_shape.1
);

// re-use output arrays each timestep chunk
let out_shape_timestep = (1, out_shape.1, out_shape.2);
let mut tmp_out_data = Array3::from_elem(out_shape_timestep, Jones::zero());
let mut tmp_out_weights = Array3::from_elem(out_shape_timestep, -0.0);

// create a VisWritable for each output vis filename
let mut out_writers: Vec<(VisOutputType, Box<dyn VisWritable>)> = vec![];
for (vis_type, file) in params.output_vis_filenames.iter() {
match vis_type {
VisOutputType::Uvfits => {
trace!(" - to uvfits {}", file.display());

let writer = UvfitsWriter::from_marlu(
&file,
&vis_ctx,
Some(params.array_position),
obs_context.phase_centre,
obs_name.clone(),
)?;

out_writers.push((VisOutputType::Uvfits, Box::new(writer)));
}
VisOutputType::MeasurementSet => {
trace!(" - to measurement set {}", file.display());
let writer = MeasurementSetWriter::new(
&file,
obs_context.phase_centre,
Some(params.array_position),
);

let sched_start_timestamp = match obs_context.obsid {
Some(gpst) => Epoch::from_gpst_seconds(gpst as f64),
None => start_timestamp,
};
let sched_duration = *obs_context.timestamps.last() - sched_start_timestamp;

let marlu_obs_ctx = MarluObsContext {
sched_start_timestamp,
sched_duration,
name: obs_name.clone(),
phase_centre: obs_context.phase_centre,
pointing_centre: obs_context.pointing_centre,
array_pos: params.array_position,
ant_positions_enh: obs_context
.tile_xyzs
.iter()
.map(|xyz| xyz.to_enh(params.array_position.latitude_rad))
.collect(),
ant_names: obs_context.tile_names.iter().cloned().collect(),
// TODO(dev): is there any value in adding this metadata via hyperdrive obs context?
field_name: None,
project_id: None,
observer: None,
};

writer.initialize(&vis_ctx, &marlu_obs_ctx)?;
out_writers.push((VisOutputType::MeasurementSet, Box::new(writer)));
}
};
}

// zip over time axis;
for (mut out_data, mut out_weights, vis_data, vis_weights) in izip!(
out_data.outer_iter_mut(),
out_weights.outer_iter_mut(),
for (&timestep, vis_data, vis_weights) in izip!(
params.timesteps.iter(),
vis_data.outer_iter(),
vis_weights.outer_iter(),
) {
let chunk_vis_ctx = VisContext {
start_timestamp: obs_context.timestamps[timestep],
..vis_ctx.clone()
};
tmp_out_data.fill(Jones::zero());
tmp_out_weights.fill(-0.0);

// zip over baseline axis
for (mut out_data, mut out_weights, vis_data, vis_weights) in izip!(
out_data.axis_iter_mut(Axis(1)),
out_weights.axis_iter_mut(Axis(1)),
for (mut tmp_out_data, mut tmp_out_weights, vis_data, vis_weights) in izip!(
tmp_out_data.axis_iter_mut(Axis(1)),
tmp_out_weights.axis_iter_mut(Axis(1)),
vis_data.axis_iter(Axis(0)),
vis_weights.axis_iter(Axis(0))
) {
// merge frequency axis
for ((_, out_jones, out_weight), in_jones, in_weight) in izip!(
izip!(0.., out_data.iter_mut(), out_weights.iter_mut(),)
izip!(0.., tmp_out_data.iter_mut(), tmp_out_weights.iter_mut(),)
.filter(|(chan_idx, _, _)| !params.flagged_fine_chans.contains(chan_idx)),
vis_data.iter(),
vis_weights.iter()
Expand All @@ -213,39 +285,26 @@ pub(crate) fn di_calibrate(
*out_weight = *in_weight;
}
}
}

let obs_name = obs_context.obsid.map(|o| format!("MWA obsid {}", o));

for (vis_type, file) in &params.output_vis_filenames {
match vis_type {
// TODO: Make this an obs_context method?
VisOutputType::Uvfits => {
trace!("Writing to output uvfits");

let mut writer = UvfitsWriter::from_marlu(
&file,
&vis_ctx,
Some(params.array_position),
obs_context.phase_centre,
obs_name.clone(),
)?;

writer.write_vis_marlu(
out_data.view(),
out_weights.view(),
&vis_ctx,
&obs_context.tile_xyzs,
false,
)?;
for (_, writer) in out_writers.iter_mut() {
writer.write_vis_marlu(
tmp_out_data.view(),
tmp_out_weights.view(),
&chunk_vis_ctx,
&obs_context.tile_xyzs,
false,
)?;
}
}

writer.write_uvfits_antenna_table(
&obs_context.tile_names,
&obs_context.tile_xyzs,
)?;
} // TODO(dev): Other formats
// finalize writing uvfits
for (vis_type, writer) in out_writers.into_iter() {
if matches!(vis_type, VisOutputType::Uvfits) {
let uvfits_writer =
unsafe { Box::from_raw(Box::into_raw(writer) as *mut UvfitsWriter) };
uvfits_writer
.write_uvfits_antenna_table(&obs_context.tile_names, &obs_context.tile_xyzs)?;
}
info!("Calibrated visibilities written to {}", file.display());
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/calibrate/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

//! Error type for all calibration-related errors.

use marlu::io::error::{IOError as MarluIOError, UvfitsWriteError};
use marlu::io::error::{IOError as MarluIOError, MeasurementSetWriteError, UvfitsWriteError};
use mwalib::fitsio;
use thiserror::Error;

Expand Down Expand Up @@ -56,6 +56,9 @@ pub enum CalibrateError {
#[error("Error when writing uvfits: {0}")]
UviftsWrite(#[from] UvfitsWriteError),

#[error("Error when writing measurement set: {0}")]
MeasurementSetWrite(#[from] MeasurementSetWriteError),

#[error("Error when using Marlu for IO: {0}")]
MarluIO(#[from] MarluIOError),

Expand Down
2 changes: 1 addition & 1 deletion src/calibrate/params/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use filenames::InputDataTypes;
use helpers::*;

use std::collections::{HashMap, HashSet};
use std::fs::OpenOptions;
use std::fs::{self, OpenOptions};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::str::FromStr;
Expand Down
10 changes: 5 additions & 5 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ use mwa_hyperdrive_common::{hifitime, marlu, ndarray, vec1};
///
/// Tile information is ordered according to the "Antenna" column in HDU 1 of
/// the observation's metafits file.
pub(crate) struct ObsContext {
pub struct ObsContext {
/// The observation ID, which is also the observation's scheduled start GPS
/// time (but shouldn't be used for this purpose).
pub(crate) obsid: Option<u32>,
pub obsid: Option<u32>,

/// The unique timestamps in the observation. These are stored as `hifitime`
/// [Epoch] structs to help keep the code flexible. These include timestamps
/// that are deemed "flagged" by the observation.
pub(crate) timestamps: Vec1<Epoch>,
pub timestamps: Vec1<Epoch>,

/// The *available* timestep indices of the input data. This does not
/// necessarily start at 0, and is not necessarily regular (e.g. a valid
Expand All @@ -37,7 +37,7 @@ pub(crate) struct ObsContext {
/// data that also isn't regular; naively reading in a dataset with 2
/// timesteps that are separated by more than the time resolution of the
/// data would give misleading results.
pub(crate) all_timesteps: Vec1<usize>,
pub all_timesteps: Vec1<usize>,

/// The timestep indices of the input data that aren't totally flagged.
///
Expand Down Expand Up @@ -107,7 +107,7 @@ pub(crate) struct ObsContext {
///
/// These are kept as ints to help some otherwise error-prone calculations
/// using floats. By using ints, we assume there is no sub-Hz structure.
pub(crate) fine_chan_freqs: Vec1<u64>,
pub fine_chan_freqs: Vec1<u64>,

/// The flagged fine channels for each baseline in the supplied data. Zero
/// indexed.
Expand Down
10 changes: 6 additions & 4 deletions src/data_formats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ mod uvfits;

pub(crate) use error::ReadInputDataError;
pub use metafits::*;
pub(crate) use ms::{MsReadError, MS};
pub use ms::{MsReadError, MS};
pub(crate) use raw::{RawDataReader, RawReadError};
pub(crate) use uvfits::{UvfitsReadError, UvfitsReader};
pub use uvfits::{UvfitsReadError, UvfitsReader};

use std::collections::{HashMap, HashSet};

Expand All @@ -27,7 +27,7 @@ use crate::context::ObsContext;
use mwa_hyperdrive_common::{marlu, ndarray, vec1};

#[derive(Debug)]
pub(crate) enum VisInputType {
pub enum VisInputType {
Raw,
MeasurementSet,
Uvfits,
Expand All @@ -37,9 +37,11 @@ pub(crate) enum VisInputType {
pub(crate) enum VisOutputType {
#[strum(serialize = "uvfits")]
Uvfits,
#[strum(serialize = "ms")]
MeasurementSet,
}

pub(crate) trait InputData: Sync + Send {
pub trait InputData: Sync + Send {
fn get_obs_context(&self) -> &ObsContext;

fn get_input_data_type(&self) -> VisInputType;
Expand Down
4 changes: 2 additions & 2 deletions src/data_formats/ms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{context::ObsContext, data_formats::metafits, time::round_hundredths_
use mwa_hyperdrive_beam::Delays;
use mwa_hyperdrive_common::{hifitime, log, marlu, mwalib, ndarray};

pub(crate) struct MS {
pub struct MS {
/// Input data metadata.
obs_context: ObsContext,

Expand All @@ -59,7 +59,7 @@ impl MS {
/// The measurement set is expected to be formatted in the way that
/// cotter/Birli write measurement sets.
// TODO: Handle multiple measurement sets.
pub(crate) fn new<P: AsRef<Path>, P2: AsRef<Path>>(
pub fn new<P: AsRef<Path>, P2: AsRef<Path>>(
ms: P,
metafits: Option<P2>,
dipole_delays: &mut Delays,
Expand Down
4 changes: 2 additions & 2 deletions src/data_formats/uvfits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ mod read;
#[cfg(test)]
mod tests;

pub(crate) use error::*;
pub(crate) use read::*;
pub use error::*;
pub use read::*;

use hifitime::Epoch;

Expand Down
4 changes: 2 additions & 2 deletions src/data_formats/uvfits/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
use mwa_hyperdrive_beam::Delays;
use mwa_hyperdrive_common::{log, marlu, mwalib, ndarray};

pub(crate) struct UvfitsReader {
pub struct UvfitsReader {
/// Observation metadata.
pub(super) obs_context: ObsContext,

Expand All @@ -48,7 +48,7 @@ impl UvfitsReader {
///
/// The measurement set is expected to be formatted in the way that
/// cotter/Birli write measurement sets.
pub(crate) fn new<P: AsRef<Path>, P2: AsRef<Path>>(
pub fn new<P: AsRef<Path>, P2: AsRef<Path>>(
uvfits: P,
metafits: Option<P2>,
dipole_delays: &mut Delays,
Expand Down
Loading

0 comments on commit 29caf9f

Please sign in to comment.