From 3722d2a7f235d99ff2f9837c8f4f42af86fe38a2 Mon Sep 17 00:00:00 2001 From: Clare Shanahan Date: Mon, 16 May 2022 10:32:53 -0400 Subject: [PATCH 1/4] fix to allow multiprocessing for jump --- src/stcal/jump/jump.py | 86 +++++++++++++++++---------- src/stcal/jump/twopoint_difference.py | 11 +++- 2 files changed, 63 insertions(+), 34 deletions(-) diff --git a/src/stcal/jump/jump.py b/src/stcal/jump/jump.py index e771e4a8..049eb663 100644 --- a/src/stcal/jump/jump.py +++ b/src/stcal/jump/jump.py @@ -13,7 +13,7 @@ def detect_jumps(frames_per_group, data, gdq, pdq, err, gain_2d, readnoise_2d, rejection_thresh, - three_grp_thresh, four_grp_thresh, max_jump_to_flag_neighbors, + three_grp_thresh, four_grp_thresh, max_cores, max_jump_to_flag_neighbors, min_jump_to_flag_neighbors, flag_4_neighbors, dqflags): """ This is the high-level controlling routine for the jump detection process. @@ -64,6 +64,12 @@ def detect_jumps(frames_per_group, data, gdq, pdq, err, four_grp_thresh : float cosmic ray sigma rejection threshold for ramps having 4 groups + max_cores: int or str + Maximum number of cores to use for multiprocessing. Available choices + are 1, 'quarter', 'half', 'all', or an integer. If the integer exceeds + the number of available cores, then it will be capped at the max number + available. + max_jump_to_flag_neighbors : float value in units of sigma that sets the upper limit for flagging of neighbors. Any jump above this cutoff will not have its neighbors @@ -82,6 +88,8 @@ def detect_jumps(frames_per_group, data, gdq, pdq, err, A dictionary with at least the following keywords: DO_NOT_USE, SATURATED, JUMP_DET, NO_GAIN_VALUE, GOOD + + Returns ------- gdq : int, 4D array @@ -124,36 +132,20 @@ def detect_jumps(frames_per_group, data, gdq, pdq, err, dtype=np.uint8) row_below_gdq = np.zeros((n_ints, n_groups, n_cols), dtype=np.uint8) - # 05/18/21 - When multiprocessing is enabled, the input data cube is split - # into a number of row slices, based on the number or avalable cores. - # Multiprocessing has been disabled for now, so the nunber of slices - # is here set to 1. I'm leaving the related code in to ease the eventual - # re-enablement of this code. - n_slices = 1 - - yinc = int(n_rows / n_slices) - slices = [] - # Slice up data, gdq, readnoise_2d into slices - # Each element of slices is a tuple of - # (data, gdq, readnoise_2d, rejection_thresh, three_grp_thresh, - # four_grp_thresh, nframes) - for i in range(n_slices - 1): - slices.insert(i, (data[:, :, i * yinc:(i + 1) * yinc, :], - gdq[:, :, i * yinc:(i + 1) * yinc, :], - readnoise_2d[i * yinc:(i + 1) * yinc, :], - rejection_thresh, three_grp_thresh, four_grp_thresh, - frames_per_group, flag_4_neighbors, - max_jump_to_flag_neighbors, - min_jump_to_flag_neighbors)) - - # last slice get the rest - slices.insert(n_slices - 1, (data[:, :, (n_slices - 1) * yinc:n_rows, :], - gdq[:, :, (n_slices - 1) * yinc:n_rows, :], - readnoise_2d[(n_slices - 1) * yinc:n_rows, :], - rejection_thresh, three_grp_thresh, - four_grp_thresh, frames_per_group, - flag_4_neighbors, max_jump_to_flag_neighbors, - min_jump_to_flag_neighbors)) + # figure out how many slices to make based on 'max_cores' + + max_available = multiprocessing.cpu_count() + if type(max_cores) == int: + if max_cores > max_available: + n_slices = max_available + else: + n_slices = max_cores + elif max_cores == 'quarter': + n_slices = max_available // 4 or 1 + elif max_cores == 'half': + n_slices = max_available // 2 or 1 + elif max_cores == 'all': + n_slices = max_available if n_slices == 1: gdq, row_below_dq, row_above_dq = \ @@ -164,6 +156,38 @@ def detect_jumps(frames_per_group, data, gdq, pdq, err, elapsed = time.time() - start else: + yinc = int(n_rows / n_slices) + slices = [] + # Slice up data, gdq, readnoise_2d into slices + # Each element of slices is a tuple of + # (data, gdq, readnoise_2d, rejection_thresh, three_grp_thresh, + # four_grp_thresh, nframes) + + # must copy arrays here, find_crs will make copies but if slices + # are being passed in for multiprocessing then the original gdq will be + # modified unless copied beforehand + gdq = gdq.copy() + data = data.copy() + copy_arrs = False # we dont need to copy arrays again in find_crs + + for i in range(n_slices - 1): + print('setting up slices') + slices.insert(i, (data[:, :, i * yinc:(i + 1) * yinc, :], + gdq[:, :, i * yinc:(i + 1) * yinc, :], + readnoise_2d[i * yinc:(i + 1) * yinc, :], + rejection_thresh, three_grp_thresh, four_grp_thresh, + frames_per_group, flag_4_neighbors, + max_jump_to_flag_neighbors, + min_jump_to_flag_neighbors, dqflags, copy_arrs)) + + # last slice get the rest + slices.insert(n_slices - 1, (data[:, :, (n_slices - 1) * yinc:n_rows, :], + gdq[:, :, (n_slices - 1) * yinc:n_rows, :], + readnoise_2d[(n_slices - 1) * yinc:n_rows, :], + rejection_thresh, three_grp_thresh, + four_grp_thresh, frames_per_group, + flag_4_neighbors, max_jump_to_flag_neighbors, + min_jump_to_flag_neighbors, dqflags, copy_arrs)) log.info("Creating %d processes for jump detection " % n_slices) pool = multiprocessing.Pool(processes=n_slices) # Starts each slice in its own process. Starmap allows more than one diff --git a/src/stcal/jump/twopoint_difference.py b/src/stcal/jump/twopoint_difference.py index 93bfa501..b569a487 100644 --- a/src/stcal/jump/twopoint_difference.py +++ b/src/stcal/jump/twopoint_difference.py @@ -8,7 +8,7 @@ def find_crs(dataa, group_dq, read_noise, normal_rej_thresh, two_diff_rej_thresh, three_diff_rej_thresh, nframes, flag_4_neighbors, max_jump_to_flag_neighbors, - min_jump_to_flag_neighbors, dqflags): + min_jump_to_flag_neighbors, dqflags, copy_arrs=True): """ Find CRs/Jumps in each integration within the input data array. The input @@ -54,6 +54,10 @@ def find_crs(dataa, group_dq, read_noise, normal_rej_thresh, neighbors (marginal detections). Any primary jump below this value will not have its neighbors flagged. + copy_arrs : bool + Flag for making internal copies of the arrays so the input isn't modified, + defaults to True. + Returns ------- gdq : int, 4D array @@ -68,8 +72,9 @@ def find_crs(dataa, group_dq, read_noise, normal_rej_thresh, """ # copy data and group DQ array - dataa = dataa.copy() - gdq = group_dq.copy() + if copy_arrs: + dataa = dataa.copy() + gdq = group_dq.copy() # Get data characteristics nints, ngroups, nrows, ncols = dataa.shape From 17ead8dacd6eb8d807afaa4c4e4989b0832fd43c Mon Sep 17 00:00:00 2001 From: Clare Shanahan Date: Mon, 16 May 2022 10:54:31 -0400 Subject: [PATCH 2/4] rename var --- src/stcal/jump/twopoint_difference.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stcal/jump/twopoint_difference.py b/src/stcal/jump/twopoint_difference.py index b569a487..5e901673 100644 --- a/src/stcal/jump/twopoint_difference.py +++ b/src/stcal/jump/twopoint_difference.py @@ -75,6 +75,8 @@ def find_crs(dataa, group_dq, read_noise, normal_rej_thresh, if copy_arrs: dataa = dataa.copy() gdq = group_dq.copy() + else: + group_dq = gdq # Get data characteristics nints, ngroups, nrows, ncols = dataa.shape From 9c218cc87583f68deaa22fcb484efe448314f725 Mon Sep 17 00:00:00 2001 From: Clare Shanahan Date: Mon, 16 May 2022 11:40:21 -0400 Subject: [PATCH 3/4] small fixes --- src/stcal/jump/jump.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/stcal/jump/jump.py b/src/stcal/jump/jump.py index 049eb663..ce1b603b 100644 --- a/src/stcal/jump/jump.py +++ b/src/stcal/jump/jump.py @@ -64,11 +64,10 @@ def detect_jumps(frames_per_group, data, gdq, pdq, err, four_grp_thresh : float cosmic ray sigma rejection threshold for ramps having 4 groups - max_cores: int or str + max_cores: str Maximum number of cores to use for multiprocessing. Available choices - are 1, 'quarter', 'half', 'all', or an integer. If the integer exceeds - the number of available cores, then it will be capped at the max number - available. + are 'none' (which will create one process), 'quarter', 'half', 'all' + (of availble cpu cores). max_jump_to_flag_neighbors : float value in units of sigma that sets the upper limit for flagging of @@ -135,11 +134,8 @@ def detect_jumps(frames_per_group, data, gdq, pdq, err, # figure out how many slices to make based on 'max_cores' max_available = multiprocessing.cpu_count() - if type(max_cores) == int: - if max_cores > max_available: - n_slices = max_available - else: - n_slices = max_cores + if max_cores.lower() == 'none': + n_slices = 1 elif max_cores == 'quarter': n_slices = max_available // 4 or 1 elif max_cores == 'half': @@ -171,7 +167,6 @@ def detect_jumps(frames_per_group, data, gdq, pdq, err, copy_arrs = False # we dont need to copy arrays again in find_crs for i in range(n_slices - 1): - print('setting up slices') slices.insert(i, (data[:, :, i * yinc:(i + 1) * yinc, :], gdq[:, :, i * yinc:(i + 1) * yinc, :], readnoise_2d[i * yinc:(i + 1) * yinc, :], From 68a26d53e9096df8f36decc524cf73d7371aed2d Mon Sep 17 00:00:00 2001 From: Clare Shanahan Date: Mon, 16 May 2022 11:44:09 -0400 Subject: [PATCH 4/4] whoops --- src/stcal/jump/twopoint_difference.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stcal/jump/twopoint_difference.py b/src/stcal/jump/twopoint_difference.py index 5e901673..ba62e2e4 100644 --- a/src/stcal/jump/twopoint_difference.py +++ b/src/stcal/jump/twopoint_difference.py @@ -76,7 +76,7 @@ def find_crs(dataa, group_dq, read_noise, normal_rej_thresh, dataa = dataa.copy() gdq = group_dq.copy() else: - group_dq = gdq + gdq = group_dq # Get data characteristics nints, ngroups, nrows, ncols = dataa.shape