From 8cf81e04c2a84c03b3c8188538b91dfe71baa561 Mon Sep 17 00:00:00 2001 From: Ken MacDonald <68608562+kmacdonald-stsci@users.noreply.github.com> Date: Wed, 28 Jul 2021 10:06:51 -0400 Subject: [PATCH] JP-2037 and JP-1926: Fixing ramp fitting multiprocessing to work with new STCAL interface. (#30) * Fixing ramp fitting multiprocessing to work with new STCAL interface. * Making changes based on code review. * Final DQ array for multiprocessing was incorrectly allocated as dtype=nd.float32, but should be dtype=np.uint32. Changed to allocate an array with the correct dtype. * Changing DQ to be uint32, not float32. * Adding addition of multiprocessing to change log. --- CHANGES.rst | 1 + src/stcal/ramp_fitting/constants.py | 8 + src/stcal/ramp_fitting/ols_fit.py | 759 +++++++++++++++------------- 3 files changed, 408 insertions(+), 360 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index b198cc8d..dd6156ad 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -11,6 +11,7 @@ ramp_fitting ------------ +- Implemented multiprocessing for OLS. [#30] - Added DQ flag parameter to `ramp_fit` [#25] - Move common ``jump`` code to stcal [#27] diff --git a/src/stcal/ramp_fitting/constants.py b/src/stcal/ramp_fitting/constants.py index f45b4d06..4401a376 100644 --- a/src/stcal/ramp_fitting/constants.py +++ b/src/stcal/ramp_fitting/constants.py @@ -13,3 +13,11 @@ def update_dqflags(input_flags): dqflags["JUMP_DET"] = input_flags["JUMP_DET"] dqflags["NO_GAIN_VALUE"] = input_flags["NO_GAIN_VALUE"] dqflags["UNRELIABLE_SLOPE"] = input_flags["UNRELIABLE_SLOPE"] + + +def update_dqflags_from_ramp_data(ramp_data): + dqflags["DO_NOT_USE"] = ramp_data.flags_do_not_use + dqflags["SATURATED"] = ramp_data.flags_saturated + dqflags["JUMP_DET"] = ramp_data.flags_jump_det + dqflags["NO_GAIN_VALUE"] = ramp_data.flags_no_gain_val + dqflags["UNRELIABLE_SLOPE"] = ramp_data.flags_unreliable_slope diff --git a/src/stcal/ramp_fitting/ols_fit.py b/src/stcal/ramp_fitting/ols_fit.py index 467b8806..7ea24f57 100644 --- a/src/stcal/ramp_fitting/ols_fit.py +++ b/src/stcal/ramp_fitting/ols_fit.py @@ -2,13 +2,14 @@ import logging -# from multiprocessing.pool import Pool as Pool +from multiprocessing.pool import Pool as Pool import numpy as np import time import warnings from . import constants +from . import ramp_fit_class from . import utils log = logging.getLogger(__name__) @@ -82,8 +83,6 @@ def ols_ramp_fit_multi( # Copy the int_times table for TSO data int_times = ramp_data.int_times - number_of_integrations = ramp_data.data.shape[0] - # For MIRI datasets having >1 group, if all pixels in the final group are # flagged as DO_NOT_USE, resize the input model arrays to exclude the # final group. Similarly, if leading groups 1 though N have all pixels @@ -100,9 +99,6 @@ def ols_ramp_fit_multi( # Call ramp fitting for the single processor (1 data slice) case if number_slices == 1: - max_segments, max_CRs = calc_num_seg(ramp_data.groupdq, number_of_integrations) - log.debug(f"Max segments={max_segments}") - # Single threaded computation image_info, integ_info, opt_info = ols_ramp_fit_single( ramp_data, int_times, buffsize, save_opt, readnoise_2d, gain_2d, weighting) @@ -113,246 +109,36 @@ def ols_ramp_fit_multi( # Call ramp fitting for multi-processor (multiple data slices) case else: - return None, None, None - - -''' -# Remove BEGIN multiprocessing - log.debug(f'number of processes being used is {number_slices}') - rows_per_slice = round(total_rows / number_slices) - pool = Pool(processes=number_slices) - slices = [] - - # Populate the first n-1 slices - for i in range(number_slices - 1): - start_row = i * rows_per_slice - stop_row = (i + 1) * rows_per_slice - readnoise_slice = readnoise_2d[start_row: stop_row, :] - gain_slice = gain_2d[start_row: stop_row, :] - data_slice = input_model.data[:, :, start_row: stop_row, :].copy() - err_slice = input_model.err[:, :, start_row: stop_row, :].copy() - groupdq_slice = input_model.groupdq[:, :, start_row:stop_row, :].copy() - pixeldq_slice = input_model.pixeldq[start_row:stop_row, :].copy() - - slices.insert( - i, - (data_slice, err_slice, groupdq_slice, pixeldq_slice, buffsize, save_opt, - readnoise_slice, gain_slice, weighting, - input_model.meta.instrument.name, input_model.meta.exposure.frame_time, - input_model.meta.exposure.ngroups, input_model.meta.exposure.group_time, - input_model.meta.exposure.groupgap, input_model.meta.exposure.nframes, - input_model.meta.exposure.drop_frames1, int_times)) - - # last slice gets the rest - start_row = (number_slices - 1) * rows_per_slice - readnoise_slice = readnoise_2d[start_row: total_rows, :] - gain_slice = gain_2d[start_row: total_rows, :] - data_slice = input_model.data[:, :, start_row: total_rows, :].copy() - err_slice = input_model.err[:, :, start_row: total_rows, :].copy() - groupdq_slice = input_model.groupdq[:, :, start_row: total_rows, :].copy() - pixeldq_slice = input_model.pixeldq[start_row: total_rows, :].copy() - slices.insert(number_slices - 1, - (data_slice, err_slice, groupdq_slice, pixeldq_slice, buffsize, save_opt, - readnoise_slice, gain_slice, weighting, - input_model.meta.instrument.name, input_model.meta.exposure.frame_time, - input_model.meta.exposure.ngroups, input_model.meta.exposure.group_time, - input_model.meta.exposure.groupgap, input_model.meta.exposure.nframes, - input_model.meta.exposure.drop_frames1, int_times)) - - # Start up the processes for each slice - log.debug("Creating %d processes for ramp fitting " % number_slices) - - # Use starmap because input is iterable as well. - real_results = pool.starmap(ols_ramp_fit_sliced, slices) - pool.close() - pool.join() - k = 0 - log.debug("All processes complete") - - # Check that all slices got processed properly - for resultslice in real_results: - if resultslice[0] is None: - return None, None, None - - # Create new model for the primary output. - actual_segments = real_results[0][20] - actual_CRs = real_results[0][21] - int_model, opt_model, out_model = \ - create_output_models(input_model, number_of_integrations, save_opt, - total_cols, total_rows, actual_segments, actual_CRs) - int_model.int_times = int_times - - # iterate over the number of slices and place the results into the output models - for resultslice in real_results: - start_row = k * rows_per_slice - if len(real_results) == k + 1: # last result - out_model.data[start_row: total_rows, :] = resultslice[0] - out_model.dq[start_row:total_rows, :] = resultslice[1] - out_model.var_poisson[start_row:total_rows, :] = resultslice[2] - out_model.var_rnoise[start_row:total_rows, :] = resultslice[3] - out_model.err[start_row:total_rows, :] = resultslice[4] - if resultslice[5] is not None: # Integration results exist - int_model.data[:, start_row:total_rows, :] = resultslice[5] - int_model.dq[:, start_row:total_rows, :] = resultslice[6] - int_model.var_poisson[:, start_row:total_rows, :] = resultslice[7] - int_model.var_rnoise[:, start_row:total_rows, :] = resultslice[8] - int_model.err[:, start_row:total_rows, :] = resultslice[9] - if resultslice[11] is not None: # Optional results exist - opt_model.slope[:, :, start_row:total_rows, :] = resultslice[11] - opt_model.sigslope[:, :, start_row:total_rows, :] = resultslice[12] - opt_model.var_poisson[:, :, start_row:total_rows, :] = resultslice[13] - opt_model.var_rnoise[:, :, start_row:total_rows, :] = resultslice[14] - opt_model.yint[:, :, start_row:total_rows, :] = resultslice[15] - opt_model.sigyint[:, :, start_row:total_rows, :] = resultslice[16] - opt_model.pedestal[:, start_row:total_rows, :] = resultslice[17] - opt_model.weights[:, :, start_row:total_rows, :] = resultslice[18] - opt_model.crmag[:, :, start_row:total_rows, :] = resultslice[19] - else: # all but last slice - stop_row = (k + 1) * rows_per_slice - out_model.data[start_row: stop_row, :] = resultslice[0] - out_model.dq[start_row: stop_row, :] = resultslice[1] - out_model.var_poisson[start_row: stop_row, :] = resultslice[2] - out_model.var_rnoise[start_row: stop_row, :] = resultslice[3] - out_model.err[start_row: stop_row, :] = resultslice[4] - if resultslice[5] is not None: # Multiple integration results exist - int_model.data[:, start_row: stop_row, :] = resultslice[5] - int_model.dq[:, start_row: stop_row, :] = resultslice[6] - int_model.var_poisson[:, start_row: stop_row, :] = resultslice[7] - int_model.var_rnoise[:, start_row: stop_row, :] = resultslice[8] - int_model.err[:, start_row: stop_row, :] = resultslice[9] - if resultslice[11] is not None: # Optional Results exist - opt_model.slope[:, :, start_row: (k + 1) * rows_per_slice, :] = resultslice[11] - opt_model.sigslope[:, :, start_row: (k + 1) * rows_per_slice, :] = resultslice[12] - opt_model.var_poisson[:, :, start_row: (k + 1) * rows_per_slice, :] = resultslice[13] - opt_model.var_rnoise[:, :, start_row: (k + 1) * rows_per_slice, :] = resultslice[14] - opt_model.yint[:, :, start_row: (k + 1) * rows_per_slice, :] = resultslice[15] - opt_model.sigyint[:, :, start_row: (k + 1) * rows_per_slice, :] = resultslice[16] - opt_model.pedestal[:, start_row: (k + 1) * rows_per_slice, :] = resultslice[17] - opt_model.weights[:, :, start_row: (k + 1) * rows_per_slice, :] = resultslice[18] - opt_model.crmag[:, :, start_row: (k + 1) * rows_per_slice, :] = resultslice[19] - k = k + 1 - - return out_model, int_model, opt_model - - -def create_output_models(input_model, number_of_integrations, save_opt, - total_cols, total_rows, actual_segments, actual_CRs): - """ - Create_output_models is used to make blank output models to hold the results from the OLS - ramp fitting. - - Parameters - ---------- - input_model : DataModel - The input ramp model - - number_of_integrations : int - The number of integration in the input model - - save_opt : bool - Whether to save the optional outputs - - total_cols : int - The number of columns in the input image - - total_rows : int - The number of rows in the input image - - actual_segments : int - The largest number of segments in the integration resulting from cosmic rays - - actual_CRs : int - The largest number of cosmic rays jumps found in any integration - - Returns - ------------ - int_model : DataModel - The per integration output model - - opt_model : DataModel - The optional output model - - out_model : RampFitOutputModel - The standard rate output model - - """ - # TODO Remove function - imshape = (total_rows, total_cols) - out_model = datamodels.ImageModel(data=np.zeros(imshape, dtype=np.float32), - dq=np.zeros(imshape, dtype=np.uint32), - var_poisson=np.zeros(imshape, dtype=np.float32), - var_rnoise=np.zeros(imshape, dtype=np.float32), - err=np.zeros(imshape, dtype=np.float32)) - # ... and add all keys from input - out_model.update(input_model) - - # create per integrations model - # TODO Remove function - int_model = datamodels.CubeModel( - data=np.zeros((number_of_integrations,) + imshape, dtype=np.float32), - dq=np.zeros((number_of_integrations,) + imshape, dtype=np.uint32), - var_poisson=np.zeros((number_of_integrations,) + imshape, dtype=np.float32), - var_rnoise=np.zeros((number_of_integrations,) + imshape, dtype=np.float32), - err=np.zeros((number_of_integrations,) + imshape, dtype=np.float32)) - int_model.int_times = None - int_model.update(input_model) # ... and add all keys from input - - # Create model for the optional output - # TODO Remove function - if save_opt: - opt_model = datamodels.RampFitOutputModel( - slope=np.zeros((number_of_integrations,) + (actual_segments,) + imshape, dtype=np.float32), - yint=np.zeros((number_of_integrations,) + (actual_segments,) + imshape, dtype=np.float32), - sigyint=np.zeros((number_of_integrations,) + (actual_segments,) + imshape, dtype=np.float32), - sigslope=np.zeros((number_of_integrations,) + (actual_segments,) + imshape, dtype=np.float32), - weights=np.zeros((number_of_integrations,) + (actual_segments,) + imshape, dtype=np.float32), - firstf_int=np.zeros((number_of_integrations,) + imshape, dtype=np.float32), - pedestal=np.zeros((number_of_integrations,) + imshape, dtype=np.float32), - crmag=np.zeros((number_of_integrations,) + (actual_CRs,) + imshape, dtype=np.float32), - var_poisson=np.zeros((number_of_integrations,) + (actual_segments,) + imshape, dtype=np.float32), - var_rnoise=np.zeros((number_of_integrations,) + (actual_segments,) + imshape, dtype=np.float32), - ) - - opt_model.meta.filename = input_model.meta.filename - opt_model.update(input_model) # ... and add all keys from input - else: - opt_model = None - - return int_model, opt_model, out_model + image_info, integ_info, opt_info = ols_ramp_fit_multiprocessing( + ramp_data, int_times, buffsize, save_opt, + readnoise_2d, gain_2d, weighting, number_slices) + return image_info, integ_info, opt_info -def ols_ramp_fit_sliced( - data, err, groupdq, inpixeldq, buffsize, save_opt, readnoise_2d, gain_2d, - weighting, instrume, frame_time, ngroups, group_time, groupgap, nframes, - dropframes1, int_times): +def ols_ramp_fit_multiprocessing( + ramp_data, int_times, buffsize, save_opt, + readnoise_2d, gain_2d, weighting, number_slices): """ Fit a ramp using ordinary least squares. Calculate the count rate for each pixel in all data cube sections and all integrations, equal to the weighted slope for all sections (intervals between cosmic rays) of the pixel's ramp - divided by the effective integration time. This function wraps the single - threaded ols_ramp_fit_single function and is used in order to properly handle - the slicing of the data for multiprocessing. + divided by the effective integration time. The data is spread across the + desired number of processors (>1). Parameters ---------- - data : The input 4-D array with ramp data (num_integrations, num_groups, num_rows, num_cols) - The input ramp data - - err : ndarray - The input 4-D error that matches the ramp data - - groupdq : ndarray - The input 4-D group DQ flags + ramp_data: RampData + Input data necessary for computing ramp fitting. - inpixeldq : ndarray - The input 2-D pixel DQ flags + int_times : None + Not used buffsize : int The working buffer size save_opt : bool - Whether to return the optional output model + Whether to return the optional output model readnoise_2d : ndarray The read noise of each pixel @@ -363,174 +149,422 @@ def ols_ramp_fit_sliced( weighting : str 'optimal' is the only valid value - instrume : str - Instrument name + number_slices: int + The number of slices to partition the data into for multiprocessing. - frame_time : float32 - The time to read one frame. + Return + ------ + image_info: tuple + The tuple of computed ramp fitting arrays. - ngroups : int - The number of groups in each integration + integ_info: tuple + The tuple of computed integration fitting arrays. - group_time : float32 - The time to read one group. + opt_info: tuple + The tuple of computed optional results arrays for fitting. + """ + log.info(f"Number of processors used for multiprocessing: {number_slices}") + slices, rows_per_slice = compute_slices_for_starmap( + ramp_data, int_times, buffsize, save_opt, + readnoise_2d, gain_2d, weighting, number_slices) - groupgap : int - The number of frames that are not included in the group average + pool = Pool(processes=number_slices) + pool_results = pool.starmap(ols_ramp_fit_single, slices) + pool.close() + pool.join() - nframes : int - The number of frames that are included in the group average + # Reassemble results + image_info, integ_info, opt_info = assemble_pool_results( + ramp_data, save_opt, pool_results, rows_per_slice) - dropframes1 : - The number of frames dropped at the beginning of every integration + return image_info, integ_info, opt_info - int_times : None - Not used - Returns - ------- - new_model.data : ndarray - The output final rate of each pixel, 2-D float +def assemble_pool_results(ramp_data, save_opt, pool_results, rows_per_slice): + """ + Takes the list of results from the starmap pool method and assembles the + slices into primary tuples to be returned by `ramp_fit`. - new_model.dq : ndarray - The output pixel dq for each pixel, 2-D flag + Parameters + ---------- + ramp_data: RampData + The data needed for ramp fitting. - new_model.var_poisson : ndarray - The variance in each pixel due to Poisson noise, 2-D float32 + save_opt: bool + The option to save the optional results. - new_model.var_rnoise : ndarray - The variance in each pixel due to read noise, 2-D float32 + pool_results: list + The list of return values from ols_ramp_fit_single for each slice. + Each slice is run through ols_ramp_fit_single, which returns three + tuples of ndarrays, so pool_results is a list of tuples. Each tuple + contains: + image_info, integ_info, opt_info - new_model.err : ndarray - The output total variance for each pixel, 2-D float32 + rows_per_slice: list + The number of rows in each slice. - int_data : ndarray - The rate for each pixel in each integration, 3-D float32 + Return + ------ + image_info: tuple + The tuple of computed ramp fitting arrays. - int_dq : ndarray - The pixel dq flag for each integration, 3-D float32 + integ_info: tuple + The tuple of computed integration fitting arrays. + + opt_info: tuple + The tuple of computed optional results arrays for fitting. + """ + # Create output arrays for each output tuple. The input ramp data and + # slices are needed for this. + image_info, integ_info, opt_info = create_output_info( + ramp_data, pool_results, save_opt) + + # Loop over the slices and assemble each slice into the main return arrays. + current_row_start = 0 + for k, result in enumerate(pool_results): + image_slice, integ_slice, opt_slice = result + nrows = rows_per_slice[k] + + get_image_slice(image_info, image_slice, current_row_start, nrows) + get_integ_slice(integ_info, integ_slice, current_row_start, nrows) + if save_opt: + get_opt_slice(opt_info, opt_slice, current_row_start, nrows) + current_row_start = current_row_start + nrows + + # Handle integration times + return image_info, integ_info, opt_info - int_var_poisson : ndarray - The variance of the rate for each integration due to Poisson noise, 3-D float32 - int_var_rnoise : ndarray - The variance of the rate for each integration due to read noise, 3-D float32 +def get_image_slice(image_info, image_slice, row_start, nrows): + """ + Populates the image output information from each slice. - int_err : ndarray - The total variance of the rate for each integration, 3-D float32 + image_info: tuple + The output image information to populate from the slice. - int_int_times : 3-D - The total time for each integration + image_slice: tuple + The output slice used to populate the output arrays. - opt_slope : ndarray - The rate of each segment in each integration, 4-D float32 + row_start: int + The start row the current slice at which starts. - opt_sigslope : ndarray - The total variance of the rate for each pixel in each segment of each - integration, 4-D float32 + nrows: int + The number of rows int the current slice. + """ + data, dq, var_poisson, var_rnoise, err = image_info + sdata, sdq, svar_poisson, svar_rnoise, serr = image_slice - opt_var_poisson : ndarray - The Poisson variance of the rate for each pixel in each segment of each - integration, 4-D float32 + srow, erow = row_start, row_start + nrows - opt_var_rnoise : ndarray - The read noise variance of the rate for each pixel in each segment of - each integration, 4-D float32 + data[srow:erow, :] = sdata + dq[srow:erow, :] = sdq + var_poisson[srow:erow, :] = svar_poisson + var_rnoise[srow:erow, :] = svar_rnoise + err[srow:erow, :] = serr - opt_yint : ndarray - The y-intercept for each pixel in each segment of each integration, 4-D float32 - opt_sigyint : ndarray - The variance for each pixel in each segment of each integration, 4-D float32 +def get_integ_slice(integ_info, integ_slice, row_start, nrows): + """ + Populates the integration output information from each slice. - opt_pedestal : ndarray - The zero point for each pixel in each segment of each integration, 4-D float32 + integ_info: tuple + The output integration information to populate from the slice. - opt_weights : ndarray - The weight of each pixel to use in combining the segments, 4-D float32 + integ_slice: tuple + The output slice used to populate the output arrays. - opt_crmag : ndarray - The magnitude of each CR in each integration, 4-D float32 + row_start: int + The start row the current slice at which starts. + + nrows: int + The number of rows int the current slice. + """ + data, dq, var_poisson, var_rnoise, int_times, err = integ_info + idata, idq, ivar_poisson, ivar_rnoise, iint_times, ierr = integ_slice + + srow, erow = row_start, row_start + nrows + + data[:, srow:erow, :] = idata + dq[:, srow:erow, :] = idq + var_poisson[:, srow:erow, :] = ivar_poisson + var_rnoise[:, srow:erow, :] = ivar_rnoise + err[:, srow:erow, :] = ierr + + +def get_opt_slice(opt_info, opt_slice, row_start, nrows): + """ + Populates the optional output information from each slice. + + opt_info: tuple + The output optional information to populate from the slice. + + opt_slice: tuple + The output slice used to populate the output arrays. + + row_start: int + The start row the current slice at which starts. + + nrows: int + The number of rows int the current slice. + """ + (slope, sigslope, var_poisson, var_rnoise, + yint, sigyint, pedestal, weights, crmag) = opt_info + (oslope, osigslope, ovar_poisson, ovar_rnoise, + oyint, osigyint, opedestal, oweights, ocrmag) = opt_slice + + srow, erow = row_start, row_start + nrows + + # The optional results product is of variable size in its second dimension. + # The number of segments/cosmic rays determine the final products size. + # Because each slice is computed indpendently, the number of segments may + # differ from segment to segment. The final output product is created + # using the max size for this dimension. To ensure correct assignment is + # done during this step, the second dimension, as well as the row + # dimension, must be specified. + slope[:, :oslope.shape[1], srow:erow, :] = oslope + sigslope[:, :osigslope.shape[1], srow:erow, :] = osigslope + var_poisson[:, :ovar_poisson.shape[1], srow:erow, :] = ovar_poisson + var_rnoise[:, :ovar_rnoise.shape[1], srow:erow, :] = ovar_rnoise + yint[:, :oyint.shape[1], srow:erow, :] = oyint + sigyint[:, :osigyint.shape[1], srow:erow, :] = osigyint + weights[:, :oweights.shape[1], srow:erow, :] = oweights + crmag[:, :ocrmag.shape[1], srow:erow, :] = ocrmag + + pedestal[:, srow:erow, :] = opedestal # Different shape (3-D, not 4-D) + + +def create_output_info(ramp_data, pool_results, save_opt): + """ + Creates the output arrays and tuples for ramp fitting reassembly for + mulitprocessing. + + Parameters + ---------- + ramp_data: RampData + The original ramp fitting data. - actual_segments : int - The actual maximum number of segments in any integration + pool_results: list + The list of results for each slice from multiprocessing. - actual_CRs : int - The actual maximum number of CRs in any integration + save_opt: bool + The option to save optional results. """ - # Package image data into a RampModel - input_model = RampModel() + tot_ints, tot_ngroups, tot_rows, tot_cols = ramp_data.data.shape - input_model.data = data - input_model.err = err - input_model.groupdq = groupdq - input_model.pixeldq = inpixeldq + imshape = (tot_rows, tot_cols) + integ_shape = (tot_ints, tot_rows, tot_cols) - # Capture exposure and instrument data into the RampModel - input_model.meta.instrument.name = instrume + # Create the primary product + data = np.zeros(imshape, dtype=np.float32) + dq = np.zeros(imshape, dtype=np.uint32) + var_poisson = np.zeros(imshape, dtype=np.float32) + var_rnoise = np.zeros(imshape, dtype=np.float32) + err = np.zeros(imshape, dtype=np.float32) - input_model.meta.exposure.frame_time = frame_time - input_model.meta.exposure.ngroups = ngroups - input_model.meta.exposure.group_time = group_time - input_model.meta.exposure.groupgap = groupgap - input_model.meta.exposure.nframes = nframes - input_model.meta.exposure.drop_frames1 = dropframes1 + image_info = (data, dq, var_poisson, var_rnoise, err) - # Compute ramp fitting - new_model, int_model, opt_res = ols_ramp_fit_single( - input_model, int_times, buffsize, save_opt, readnoise_2d, gain_2d, weighting) + # Create the integration products + idata = np.zeros(integ_shape, dtype=np.float32) + idq = np.zeros(integ_shape, dtype=np.uint32) + ivar_poisson = np.zeros(integ_shape, dtype=np.float32) + ivar_rnoise = np.zeros(integ_shape, dtype=np.float32) + ierr = np.zeros(integ_shape, dtype=np.float32) + int_times = ramp_data.int_times - if new_model is None: - return [None] * 22 + integ_info = (idata, idq, ivar_poisson, ivar_rnoise, int_times, ierr) - # Package computed data for return - if int_model is not None: - int_data = int_model.data.copy() - int_dq = int_model.dq.copy() - int_var_poisson = int_model.var_poisson.copy() - int_var_rnoise = int_model.var_rnoise.copy() - int_err = int_model.err.copy() - int_int_times = int_model.int_times.copy() + # Create the optional results product + if save_opt: + max_segs, max_crs = get_max_segs_crs(pool_results) + opt_shape = (tot_ints, max_segs, tot_rows, tot_cols) + crmag_shape = (tot_ints, max_crs, tot_rows, tot_cols) + + oslope = np.zeros(opt_shape, dtype=np.float32) + osigslope = np.zeros(opt_shape, dtype=np.float32) + ovar_poisson = np.zeros(opt_shape, dtype=np.float32) + ovar_rnoise = np.zeros(opt_shape, dtype=np.float32) + oyint = np.zeros(opt_shape, dtype=np.float32) + osigyint = np.zeros(opt_shape, dtype=np.float32) + oweights = np.zeros(opt_shape, dtype=np.float32) + + # Different shape + opedestal = np.zeros(integ_shape, dtype=np.float32) + ocrmag = np.zeros(crmag_shape, dtype=np.float32) + + opt_info = (oslope, osigslope, ovar_poisson, ovar_rnoise, + oyint, osigyint, opedestal, oweights, ocrmag) else: - int_data = None - int_dq = None - int_var_poisson = None - int_var_rnoise = None - int_err = None - int_int_times = None + opt_info = None - if opt_res is not None: - opt_slope = opt_res.slope.copy() - opt_sigslope = opt_res.sigslope.copy() - opt_var_poisson = opt_res.var_poisson.copy() - opt_var_rnoise = opt_res.var_rnoise.copy() - opt_yint = opt_res.yint.copy() - opt_sigyint = opt_res.sigyint.copy() - opt_pedestal = opt_res.pedestal.copy() - opt_weights = opt_res.weights.copy() - opt_crmag = opt_res.crmag.copy() - actual_segments = opt_slope.shape[1] - actual_CRs = opt_crmag.shape[1] + return image_info, integ_info, opt_info + + +def get_max_segs_crs(pool_results): + """ + Computes the max number of segments computed needed for the second + dimension of the optional results output. + + Parameter + --------- + pool_results: list + The list of results for each slice from multiprocessing. + + Return + ------ + seg_max: int + The maximum segment computed over all slices. + """ + seg_max = 1 + crs_max = 0 + for result in pool_results: + image_slice, integ_slice, opt_slice = result + oslice_slope = opt_slice[0] + nsegs = oslice_slope.shape[1] + if nsegs > seg_max: + seg_max = nsegs + + olice_crmag = opt_slice[-1] + ncrs = olice_crmag.shape[1] + if ncrs > crs_max: + crs_max = ncrs + return seg_max, crs_max + + +def compute_slices_for_starmap( + ramp_data, int_times, buffsize, save_opt, + readnoise_2d, gain_2d, weighting, number_slices): + """ + Creates the slices needed for each process for multiprocessing. The slices + for the arguments needed for ols_ramp_fit_single. + + ramp_data: RampData + The ramp data to be sliced. + + int_times : None + Not used + + buffsize : int + The working buffer size + + save_opt : bool + Whether to return the optional output model + + readnoise_2d : ndarray + The read noise of each pixel + + gain_2d : ndarray + The gain of each pixel + + weighting : str + 'optimal' is the only valid value + + number_slices: int + The number of slices to partition the data into for multiprocessing. + + Return + ------ + slices: list + The list of arguments for each processor for multiprocessing. + """ + nrows = ramp_data.data.shape[2] + rslices = rows_per_slice(number_slices, nrows) + slices = [] + start_row = 0 + for k in range(len(rslices)): + ramp_slice = slice_ramp_data(ramp_data, start_row, rslices[k]) + rnoise_slice = readnoise_2d[start_row:start_row + rslices[k], :].copy() + gain_slice = gain_2d[start_row:start_row + rslices[k], :].copy() + slices.insert( + k, + (ramp_slice, int_times, buffsize, save_opt, + rnoise_slice, gain_slice, weighting)) + start_row = start_row + rslices[k] + + return slices, rslices + + +def rows_per_slice(nslices, nrows): + """ + Compute the number of rows per slice. + + Parameters + ---------- + nslices: int + The number of slices to partition the rows. + + nrows: int + The number of rows to partition. + + Return + ______ + rslices: list + The number of rows for each slice. + """ + quotient = nrows // nslices + remainder = nrows % nslices + + no_inc = nslices - remainder + if remainder > 0: + # Ensure the number of rows per slice is no more than a + # difference of one. + first = [quotient + 1] * remainder + second = [quotient] * no_inc + rslices = first + second else: - opt_slope = None - opt_sigslope = None - opt_var_poisson = None - opt_var_rnoise = None - opt_yint = None - opt_sigyint = None - opt_pedestal = None - opt_weights = None - opt_crmag = None - actual_segments = 0 - actual_CRs = 0 - - return new_model.data, new_model.dq, new_model.var_poisson, \ - new_model.var_rnoise, new_model.err, \ - int_data, int_dq, int_var_poisson, int_var_rnoise, int_err, int_int_times, \ - opt_slope, opt_sigslope, opt_var_poisson, opt_var_rnoise, opt_yint, opt_sigyint, \ - opt_pedestal, opt_weights, opt_crmag, actual_segments, actual_CRs -# END Multiprocessing -''' + rslices = [quotient] * nslices + + return rslices + + +def slice_ramp_data(ramp_data, start_row, nrows): + """ + Slices the ramp data by rows, where the arrays contain all rows in + [start_row, start_row+nrows). + + Parameters + ---------- + ramp_data: RampData + The ramp data to slice. + + start_rows: int + The start row of the slice. + + nrows: int + The number of rows in the slice. + + Return + ------ + ramp_data_slice: RampData + The slice of the ramp_data. + """ + ramp_data_slice = ramp_fit_class.RampData() + + # Slice data by row + data = ramp_data.data[:, :, start_row:start_row + nrows, :].copy() + err = ramp_data.err[:, :, start_row:start_row + nrows, :].copy() + groupdq = ramp_data.groupdq[:, :, start_row:start_row + nrows, :].copy() + pixeldq = ramp_data.pixeldq[start_row:start_row + nrows, :].copy() + + ramp_data_slice.set_arrays( + data, err, groupdq, pixeldq, ramp_data.int_times) + + # Carry over meta data. + ramp_data_slice.set_meta( + name=ramp_data.instrument_name, + frame_time=ramp_data.frame_time, + group_time=ramp_data.group_time, + groupgap=ramp_data.groupgap, + nframes=ramp_data.nframes, + drop_frames1=ramp_data.drop_frames1) + + # Carry over DQ flags. + ramp_data_slice.flags_do_not_use = ramp_data.flags_do_not_use + ramp_data_slice.flags_jump_det = ramp_data.flags_jump_det + ramp_data_slice.flags_saturated = ramp_data.flags_saturated + ramp_data_slice.flags_no_gain_val = ramp_data.flags_no_gain_val + ramp_data_slice.flags_unreliable_slope = ramp_data.flags_unreliable_slope + + return ramp_data_slice def ols_ramp_fit_single( @@ -575,6 +609,12 @@ def ols_ramp_fit_single( opt_info : tuple The tuple of computed optional results arrays for fitting. """ + # For multiprocessing, a new process requires the DQ flags to be updated, + # since they are global variables. + constants.update_dqflags_from_ramp_data(ramp_data) + if None in constants.dqflags.values(): + raise ValueError("Some of the DQ flags required for ramp_fitting are None.") + tstart = time.time() # Save original shapes for writing to log file, as these may change for MIRI @@ -822,7 +862,6 @@ def ramp_fit_slopes(ramp_data, gain_2d, readnoise_2d, save_opt, weighting): gdq_cube_shape = gdq_cube.shape # Get max number of segments fit in all integrations - # TODO: this computation is already done in ramp_fit_multi max_seg, num_CRs = calc_num_seg(gdq_cube, n_int) del gdq_cube