diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 5a5652fed..955a1ad33 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -34,11 +34,6 @@ jobs: uses: KyleMayes/install-llvm-action@v1 with: version: "10.0" - - name: cache poetry install - uses: actions/cache@v2 - with: - path: ~/.local - key: poetry-1.5.1-0 - uses: snok/install-poetry@v1 with: diff --git a/CHANGELOG.md b/CHANGELOG.md index 27d83e2be..8d86dd369 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), #### Added +- Added support for compressed FITS files [#694](https://github.com/askap-vast/vast-pipeline/pull/694) - Added links to Data Central DAS and the Fink Broker to the source page [#697](https://github.com/askap-vast/vast-pipeline/pull/697/) - Added `n_new_sources` column to run model to store the number of new sources in a pipeline run [#676](https://github.com/askap-vast/vast-pipeline/pull/676). - Added `MAX_CUTOUT_IMAGES` to the pipeline settings to limit the number of postage stamps displayed on the source detail page [#658](https://github.com/askap-vast/vast-pipeline/pull/658). @@ -41,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), #### Changed +- Updated all FITS loading to use a wrapper that can handle compressed FITS files [#694](https://github.com/askap-vast/vast-pipeline/pull/694) - Downgrade ci-docs to python 3.8 [#702](https://github.com/askap-vast/vast-pipeline/pull/702) - Update Gr1N poetry to v8, force python 3.8.10 [#701](https://github.com/askap-vast/vast-pipeline/pull/701) - Updated path to test data in github actions and docs [#699](https://github.com/askap-vast/vast-pipeline/pull/699) @@ -112,7 +114,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), #### List of PRs -- [#702](https://github.com/askap-vast/vast-pipeline/pull/702): fix: Downgrade ci-docs to python 3.8 +- [#694](https://github.com/askap-vast/vast-pipeline/pull/694): feat: Handle compressed fits files. +- [#702](https://github.com/askap-vast/vast-pipeline/pull/702): fix: Downgrade ci-docs to python 3.8. - [#701](https://github.com/askap-vast/vast-pipeline/pull/701): fix: Update Gr1N poetry to v8, force python 3.8.10. - [#699](https://github.com/askap-vast/vast-pipeline/pull/699): docs, feat: Add new regression data download URL and updates to Github Actions. - [#697](https://github.com/askap-vast/vast-pipeline/pull/697/): feat: Added links to Data Central DAS and the Fink Broker to the source page. diff --git a/docs/using/runconfig.md b/docs/using/runconfig.md index 867379f23..864f3bdc3 100644 --- a/docs/using/runconfig.md +++ b/docs/using/runconfig.md @@ -151,8 +151,7 @@ Boolean. Astropy warnings are suppressed in the logging output if set to `True`. **`inputs.image`** Line entries or epoch headed entries. -The full paths to the image FITS files to be processed. -Epoch mode is activated by including an extra key value with the epoch name, see the example below for a demonstration. +The full paths to the image FITS files to be processed - these can be regular FITS files, or FITS files that use a [`CompImageHDU`](https://docs.astropy.org/en/stable/io/fits/api/images.html#astropy.io.fits.CompImageHDU). In principle the pipeline also supports [`.fits.fz`](https://heasarc.gsfc.nasa.gov/fitsio/fpack/) files, although this is not officially supported. Epoch mode is activated by including an extra key value with the epoch name, see the example below for a demonstration. Refer to [this section](../design/association.md#epoch-based-association) of the documentation for more information on epoch based association. diff --git a/vast_pipeline/image/main.py b/vast_pipeline/image/main.py index 4ecbb6823..2a8edaf5b 100644 --- a/vast_pipeline/image/main.py +++ b/vast_pipeline/image/main.py @@ -19,6 +19,7 @@ from vast_pipeline import models from vast_pipeline.survey.translators import tr_selavy +from vast_pipeline.image.utils import open_fits logger = logging.getLogger(__name__) @@ -33,6 +34,7 @@ class Image(object): path (str): The system path to the image. """ + def __init__(self, path: str) -> None: """ Initiliase an image object. @@ -80,7 +82,7 @@ class FitsImage(Image): entire_image = True - def __init__(self, path: str, hdu_index: int=0) -> None: + def __init__(self, path: str, hdu_index: int = 0) -> None: """ Initialise a FitsImage object. @@ -107,7 +109,7 @@ def __init__(self, path: str, hdu_index: int=0) -> None: def __get_header(self, hdu_index: int) -> fits.Header: """ - Retrieves the header from teh FITS image. + Retrieves the header from the FITS image. Args: hdu_index: @@ -116,13 +118,14 @@ def __get_header(self, hdu_index: int) -> fits.Header: Returns: The FITS header as an astropy.io.fits.Header object. """ + try: - with fits.open(self.path) as hdulist: + with open_fits(self.path) as hdulist: hdu = hdulist[hdu_index] except Exception: raise IOError(( - 'Could not read this FITS file: ' - f'{os.path.basename(self.path)}' + 'Could not read FITS file: ' + f'{self.path}' )) return hdu.header.copy() @@ -223,7 +226,8 @@ def __get_radius_pixels( The radius of the image in pixels. """ if self.entire_image: - # a large circle that *should* include the whole image (and then some) + # a large circle that *should* include the whole image + # (and then some) diameter = np.hypot(header[fits_naxis1], header[fits_naxis2]) else: # We simply place the largest circle we can in the centre. @@ -244,10 +248,11 @@ def __get_frequency(self, header: fits.Header) -> None: self.freq_eff = None self.freq_bw = None try: - if ('ctype3' in header) and (header['ctype3'] in ('FREQ', 'VOPT')): + freq_keys = ('FREQ', 'VOPT') + if ('ctype3' in header) and (header['ctype3'] in freq_keys): self.freq_eff = header['crval3'] self.freq_bw = header['cdelt3'] if 'cdelt3' in header else 0.0 - elif ('ctype4' in header) and (header['ctype4'] in ('FREQ', 'VOPT')): + elif ('ctype4' in header) and (header['ctype4'] in freq_keys): self.freq_eff = header['crval4'] self.freq_bw = header['cdelt4'] if 'cdelt4' in header else 0.0 else: @@ -271,6 +276,7 @@ class SelavyImage(FitsImage): associated with the image. config (Dict): The image configuration settings. """ + def __init__( self, path: str, @@ -313,7 +319,8 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame: Dataframe containing the cleaned and processed Selavy components. """ # TODO: improve with loading only the cols we need and set datatype - if self.selavy_path.endswith(".xml") or self.selavy_path.endswith(".vot"): + if self.selavy_path.endswith( + ".xml") or self.selavy_path.endswith(".vot"): df = Table.read( self.selavy_path, format="votable", use_names_over_ids=True ).to_pandas() @@ -460,12 +467,12 @@ def read_selavy(self, dj_image: models.Image) -> pd.DataFrame: .agg('sum') ) - df['flux_int_isl_ratio'] = ( + df['flux_int_isl_ratio'] = ( df['flux_int'].values / island_flux_totals.loc[df['island_id']]['flux_int'].values ) - df['flux_peak_isl_ratio'] = ( + df['flux_peak_isl_ratio'] = ( df['flux_peak'].values / island_flux_totals.loc[df['island_id']]['flux_peak'].values ) diff --git a/vast_pipeline/image/utils.py b/vast_pipeline/image/utils.py index 4b85d5389..152017532 100644 --- a/vast_pipeline/image/utils.py +++ b/vast_pipeline/image/utils.py @@ -7,7 +7,9 @@ import numpy as np import pandas as pd -from typing import Tuple +from typing import Tuple, Union, Optional +from pathlib import Path +from astropy.io import fits logger = logging.getLogger(__name__) @@ -81,7 +83,7 @@ def calc_error_radius(ra, ra_err, dec, dec_err) -> float: np.deg2rad(i), dec_1, np.deg2rad(j) - )) for i,j in zip(ra_offsets, dec_offsets) + )) for i, j in zip(ra_offsets, dec_offsets) ] seps = np.column_stack(seps) @@ -190,7 +192,7 @@ def calc_condon_flux_errors( (1. + (theta_B / major)**2)**alpha_maj2 * (1. + (theta_b / minor)**2)**alpha_min2 * snr**2) - rho_sq3 = ((major * minor / (4.* theta_B * theta_b)) * + rho_sq3 = ((major * minor / (4. * theta_B * theta_b)) * (1. + (theta_B / major)**2)**alpha_maj3 * (1. + (theta_b / minor)**2)**alpha_min3 * snr**2) @@ -210,9 +212,9 @@ def calc_condon_flux_errors( # ra and dec errors errorra = np.sqrt((error_par_major * np.sin(theta))**2 + - (error_par_minor * np.cos(theta))**2) + (error_par_minor * np.cos(theta))**2) errordec = np.sqrt((error_par_major * np.cos(theta))**2 + - (error_par_minor * np.sin(theta))**2) + (error_par_minor * np.sin(theta))**2) errormajor = np.sqrt(2) * major / rho1 errorminor = np.sqrt(2) * minor / rho2 @@ -238,11 +240,40 @@ def calc_condon_flux_errors( help1 = (errormajor / major)**2 help2 = (errorminor / minor)**2 help3 = theta_B * theta_b / (major * minor) - errorflux = np.abs(flux_int) * np.sqrt(errorpeaksq / flux_peak**2 + help3 * (help1 + help2)) + help4 = np.sqrt(errorpeaksq / flux_peak**2 + help3 * (help1 + help2)) + errorflux = np.abs(flux_int) * help4 # need to return flux_peak if used. return errorpeak, errorflux, errormajor, errorminor, errortheta, errorra, errordec except Exception as e: - logger.debug("Error in the calculation of Condon errors for a source", exc_info=True) + logger.debug( + "Error in the calculation of Condon errors for a source", + exc_info=True) return 0., 0., 0., 0., 0., 0., 0. + + +def open_fits(fits_path: Union[str, Path], memmap: Optional[bool] = True): + """ + This function opens both compressed and uncompressed fits files. + + Args: + fits_path: Path to the fits file + memmap: Open the fits file with mmap. + + Returns: + HDUList loaded from the fits file + """ + + if isinstance(fits_path, Path): + fits_path = str(fits_path) + + hdul = fits.open(fits_path, memmap=memmap) + + # This is a messy way to check, but I can't think of a better one + if len(hdul) == 1: + return hdul + elif type(hdul[1]) == fits.hdu.compressed.CompImageHDU: + return fits.HDUList(hdul[1:]) + else: + return hdul diff --git a/vast_pipeline/pipeline/forced_extraction.py b/vast_pipeline/pipeline/forced_extraction.py index 33bb9e04d..eef7a3ed8 100644 --- a/vast_pipeline/pipeline/forced_extraction.py +++ b/vast_pipeline/pipeline/forced_extraction.py @@ -13,13 +13,14 @@ from django.conf import settings from django.db import transaction from pyarrow.parquet import read_schema -from typing import Any, List, Tuple, Dict +from typing import Any, List, Tuple, Dict, Optional from vast_pipeline.models import Image, Measurement, Run from vast_pipeline.pipeline.loading import make_upload_measurements from forced_phot import ForcedPhot from ..utils.utils import StopWatch +from vast_pipeline.image.utils import open_fits logger = logging.getLogger(__name__) @@ -68,7 +69,8 @@ def get_data_from_parquet( Args: file_and_image_id: - a tuple containing the path of the measurements parquet file and the image ID. + a tuple containing the path of the measurements parquet file and + the image ID. p_run_path: Pipeline run path to get forced parquet in case of add mode. add_mode: @@ -104,6 +106,30 @@ def get_data_from_parquet( return {'prefix': prefix, 'max_id': max_id, 'id': image_id} +def _forcedphot_preload(image: str, + background: str, + noise: str, + memmap: Optional[bool] = False + ): + """ + Load the relevant image, background and noisemap files. + + Args: + image: a string with the path of the image file + background: a string with the path of the background map + noise: a string with the path of the noise map + + Returns: + A tuple containing the HDU lists + """ + + image_hdul = open_fits(image, memmap=memmap) + background_hdul = open_fits(background, memmap=memmap) + noise_hdul = open_fits(noise, memmap=memmap) + + return image_hdul, background_hdul, noise_hdul + + def extract_from_image( df: pd.DataFrame, image: str, @@ -147,8 +173,16 @@ def extract_from_image( df['wavg_dec'].values, unit=(u.deg, u.deg) ) + # load the image, background and noisemaps into memory + # a dedicated function may seem unneccesary, but will be useful if we + # split the load to a separate thread. + forcedphot_input = _forcedphot_preload(image, + background, + noise, + memmap=False + ) + FP = ForcedPhot(*forcedphot_input) - FP = ForcedPhot(image, background, noise) flux, flux_err, chisq, DOF, cluster_id = FP.measure( P_islands, cluster_threshold=cluster_threshold, @@ -166,7 +200,7 @@ def finalise_forced_dfs( df: pd.DataFrame, prefix: str, max_id: int, beam_bmaj: float, beam_bmin: float, beam_bpa: float, id: int, datetime: datetime.datetime, image: str - ) -> pd.DataFrame: +) -> pd.DataFrame: """ Compute populate leftover columns for the dataframe with forced photometry data given the input parameters @@ -223,7 +257,7 @@ def parallel_extraction( df: pd.DataFrame, df_images: pd.DataFrame, df_sources: pd.DataFrame, min_sigma: float, edge_buffer: float, cluster_threshold: float, allow_nan: bool, add_mode: bool, p_run_path: str - ) -> pd.DataFrame: +) -> pd.DataFrame: """ Parallelize forced extraction with Dask @@ -260,7 +294,7 @@ def parallel_extraction( """ # explode the lists in 'img_diff' column (this will make a copy of the df) out = ( - df.rename(columns={'img_diff':'image', 'source':'source_tmp_id'}) + df.rename(columns={'img_diff': 'image', 'source': 'source_tmp_id'}) # merge the rms_min column from df_images .merge( df_images[['rms_min']], @@ -285,8 +319,8 @@ def parallel_extraction( out['max_snr'] = out['flux_peak'].values / out['image_rms_min'].values out = out[out['max_snr'] > min_sigma].reset_index(drop=True) logger.debug("Min forced sigma dropped %i sources", - predrop_shape - out.shape[0] - ) + predrop_shape - out.shape[0] + ) # drop some columns that are no longer needed and the df should look like # out @@ -309,7 +343,8 @@ def parallel_extraction( # create a list of dictionaries with image file paths and dataframes # with data related to each images def image_data_func(image_name: str) -> Dict[str, Any]: - nonlocal out # `out` refers to the `out` declared in nearest enclosing scope + # `out` refers to the `out` declared in nearest enclosing scope + nonlocal out return { 'image_id': df_images.at[image_name, 'id'], 'image': df_images.at[image_name, 'path'], @@ -384,7 +419,7 @@ def image_data_func(image_name: str) -> Dict[str, Any]: pd.concat(intermediate_df, axis=0, sort=False) .rename( columns={ - 'wavg_ra':'ra', 'wavg_dec':'dec', 'image_name': 'image' + 'wavg_ra': 'ra', 'wavg_dec': 'dec', 'image_name': 'image' } ) ) @@ -393,7 +428,7 @@ def image_data_func(image_name: str) -> Dict[str, Any]: def write_group_to_parquet( - df: pd.DataFrame, fname: str, add_mode: bool) -> None: + df: pd.DataFrame, fname: str, add_mode: bool) -> None: ''' Write a dataframe correpondent to a single group/image to a parquet file. @@ -420,7 +455,7 @@ def write_group_to_parquet( def parallel_write_parquet( - df: pd.DataFrame, run_path: str, add_mode: bool = False) -> None: + df: pd.DataFrame, run_path: str, add_mode: bool = False) -> None: ''' Parallelize writing parquet files for forced measurements. @@ -436,9 +471,10 @@ def parallel_write_parquet( None ''' images = df['image'].unique().tolist() - get_fname = lambda n: os.path.join( + + def get_fname(n): return os.path.join( run_path, - 'forced_measurements_' + n.replace('.','_') + '.parquet' + 'forced_measurements_' + n.replace('.', '_') + '.parquet' ) dfs = list(map(lambda x: (df[df['image'] == x], get_fname(x)), images)) n_cpu = cpu_count() - 1 diff --git a/vast_pipeline/pipeline/new_sources.py b/vast_pipeline/pipeline/new_sources.py index c5b751147..d30f490dd 100644 --- a/vast_pipeline/pipeline/new_sources.py +++ b/vast_pipeline/pipeline/new_sources.py @@ -6,7 +6,6 @@ from psutil import cpu_count from astropy import units as u from astropy.coordinates import SkyCoord -from astropy.io import fits from astropy.wcs import WCS from astropy.wcs.utils import ( proj_plane_pixel_scales @@ -14,6 +13,7 @@ from vast_pipeline.models import Image, Run from vast_pipeline.utils.utils import StopWatch +from vast_pipeline.image.utils import open_fits logger = logging.getLogger(__name__) @@ -84,7 +84,7 @@ def get_image_rms_measurements( return group image = group.iloc[0]['img_diff_rms_path'] - with fits.open(image) as hdul: + with open_fits(image) as hdul: header = hdul[0].header wcs = WCS(header, naxis=2) data = hdul[0].data.squeeze() @@ -101,7 +101,7 @@ def get_image_rms_measurements( npix = round( (nbeam / 2. * bmaj.to('arcsec') / - pixelscale).value + pixelscale).value ) npix = int(round(npix * edge_buffer)) @@ -155,7 +155,7 @@ def get_image_rms_measurements( nan_valid = [] # Get slices of each source and check NaN is not included. - for i,j in zip(array_coords[0], array_coords[1]): + for i, j in zip(array_coords[0], array_coords[1]): sl = tuple(( slice(i - acceptable_no_nan_dist, i + acceptable_no_nan_dist), slice(j - acceptable_no_nan_dist, j + acceptable_no_nan_dist) @@ -244,10 +244,10 @@ def new_sources( min_sigma: float, edge_buffer: float, p_run: Run ) -> pd.DataFrame: """ - Processes the new sources detected to check that they are valid new sources. - This involves checking to see that the source *should* be seen at all in - the images where it is not detected. For valid new sources the snr - value the source would have in non-detected images is also calculated. + Processes the new sources detected to check that they are valid new + sources. This involves checking to see that the source *should* be seen at + all in the images where it is not detected. For valid new sources the + snr value the source would have in non-detected images is also calculated. Args: sources_df: @@ -352,7 +352,7 @@ def new_sources( left_on='detection', right_on='name', how='left' - ).rename(columns={'datetime':'detection_time'}) + ).rename(columns={'datetime': 'detection_time'}) new_sources_df = new_sources_df.merge( images_df[[ @@ -363,7 +363,7 @@ def new_sources( right_on='name', how='left' ).rename(columns={ - 'datetime':'img_diff_time', + 'datetime': 'img_diff_time', 'rms_min': 'img_diff_rms_min', 'rms_median': 'img_diff_rms_median', 'noise_path': 'img_diff_rms_path' @@ -438,10 +438,11 @@ def new_sources( new_sources_df .drop_duplicates('source') .set_index('source') - .rename(columns={'true_sigma':'new_high_sigma'}) + .rename(columns={'true_sigma': 'new_high_sigma'}) ) - # moving forward only the new_high_sigma columns is needed, drop all others. + # moving forward only the new_high_sigma columns is needed, drop all + # others. new_sources_df = new_sources_df[['new_high_sigma']] logger.info( diff --git a/vast_pipeline/pipeline/utils.py b/vast_pipeline/pipeline/utils.py index 9169f25a1..82437141c 100644 --- a/vast_pipeline/pipeline/utils.py +++ b/vast_pipeline/pipeline/utils.py @@ -15,7 +15,6 @@ import dask.dataframe as dd from typing import Any, List, Optional, Dict, Tuple, Union -from astropy.io import fits from astropy.coordinates import SkyCoord, Angle from django.conf import settings from django.contrib.auth.models import User @@ -23,6 +22,7 @@ from itertools import chain from vast_pipeline.image.main import FitsImage, SelavyImage +from vast_pipeline.image.utils import open_fits from vast_pipeline.utils.utils import ( eq_to_cart, StopWatch, optimize_ints, optimize_floats ) @@ -793,7 +793,7 @@ def get_rms_noise_image_values(rms_path: str) -> Tuple[float, float, float]: logger.debug('Extracting Image RMS values from Noise file...') med_val = min_val = max_val = 0. try: - with fits.open(rms_path) as f: + with open_fits(rms_path) as f: data = f[0].data data = data[np.logical_not(np.isnan(data))] data = data[data != 0] diff --git a/vast_pipeline/views.py b/vast_pipeline/views.py index 3eec7edbf..6245ed7dd 100644 --- a/vast_pipeline/views.py +++ b/vast_pipeline/views.py @@ -66,6 +66,7 @@ from vast_pipeline.management.commands.initpiperun import initialise_run from vast_pipeline.forms import PipelineRunForm, CommentForm, TagWithCommentsForm from vast_pipeline.pipeline.config import PipelineConfig +from vast_pipeline.image.utils import open_fits logger = logging.getLogger(__name__) @@ -1867,7 +1868,7 @@ def get(self, request, measurement_id: int, size: str = "normal"): measurement = Measurement.objects.get(id=measurement_id) - image_hdu: fits.PrimaryHDU = fits.open(measurement.image.path)[0] + image_hdu: fits.PrimaryHDU = open_fits(measurement.image.path)[0] coord = SkyCoord(ra=measurement.ra, dec=measurement.dec, unit="deg") sizes = { "xlarge": "40arcmin",