diff --git a/acis_thermal_check/__init__.py b/acis_thermal_check/__init__.py index aa0aa1d..98d4c0a 100644 --- a/acis_thermal_check/__init__.py +++ b/acis_thermal_check/__init__.py @@ -2,9 +2,8 @@ __version__ = ska_helpers.get_version(__package__) -from acis_thermal_check.acis_obs import acis_filter, fetch_ocat_data from acis_thermal_check.main import ACISThermalCheck, DPABoardTempCheck -from acis_thermal_check.utils import get_acis_limits, get_options, mylog +from acis_thermal_check.utils import get_options, mylog def test(*args, **kwargs): diff --git a/acis_thermal_check/acis_obs.py b/acis_thermal_check/acis_obs.py deleted file mode 100644 index 0cc4feb..0000000 --- a/acis_thermal_check/acis_obs.py +++ /dev/null @@ -1,371 +0,0 @@ -import numpy as np -from cxotime import CxoTime -from kadi.commands.states import decode_power - -from acis_thermal_check.utils import mylog - - -def who_in_fp(simpos=80655): - """ - Returns a string telling you which instrument is in - the Focal Plane. "launchlock" is returned because that's a - position we never expect to see the sim in - it's an indicator - to the user that there's a problem. - - Also, The ranges for detector sections use the max and min hard - stop locations, and they also split the difference between "I" - and "S" for each instrument. - - input: - TSC position (simpos) - INTEGER - - output - String indicating what is in the focal plane - "launchlock" - default - "ACIS-I" - "ACIS-S" - "HRC-I" - "HRC-S" - """ - is_in_the_fp = "launchlock" - - # Set the value of is_in_the_fp to the appropriate value. It will default - # to "launchlock" if no value matches - if 104839 >= simpos >= 82109: - is_in_the_fp = "ACIS-I" - elif 82108 >= simpos >= 70736: - is_in_the_fp = "ACIS-S" - elif -20000 >= simpos >= -86147: - is_in_the_fp = "HRC-I" - elif -86148 >= simpos >= -104362: - is_in_the_fp = "HRC-S" - - # return the string indicating which instrument is in the Focal Plane - return is_in_the_fp - - -def fetch_ocat_data(obsid_list): - """ - Take a list of obsids and return the following data from - the obscat for each: grating status, CCD count, if S3 is on, - and the number of expected counts - - Parameters - ---------- - obsid_list : list of ints - The obsids to get the obscat data from. - - Returns - ------- - A dict of NumPy arrays of the above properties. - """ - import requests - from astropy.io import ascii - from ska_helpers.retry import RetryError, retry_call - - warn = ( - "Could not get the table from the Obscat to " - "determine which observations can go to -109 C. " - "Any violations of eligible observations should " - "be hand-checked." - ) - # Only bother with this check if obsids are found - if len(obsid_list) > 0: - # The following uses a request call to the obscat which explicitly - # asks for text formatting so that the output can be ingested into - # an AstroPy table. - urlbase = "https://cda.harvard.edu/srservices/ocatDetails.do?format=text" - obsid_list = ",".join([str(obsid) for obsid in obsid_list]) - params = {"obsid": obsid_list} - # First fetch the information from the obsid itself - got_table = True - try: - resp = retry_call( - requests.get, - [urlbase], - {"params": params}, - tries=4, - delay=1, - ) - except (requests.ConnectionError, RetryError): - got_table = False - else: - if not resp.ok: - got_table = False - else: - warn = "No obsids to check, may be a vehicle load--please check if not." - got_table = False - if got_table: - tab = ascii.read(resp.text, header_start=0, data_start=2) - tab.sort("OBSID") - # Now we have to find all of the obsids in each sequence and then - # compute the complete exposure for each sequence - seq_nums = np.unique( - [str(sn) for sn in tab["SEQ_NUM"].data.astype("str") if sn != " "], - ) - seq_num_list = ",".join(seq_nums) - obsids = tab["OBSID"].data.astype("int") - cnt_rate = tab["EST_CNT_RATE"].data.astype("float64") - params = {"seqNum": seq_num_list} - got_seq_table = True - try: - resp = retry_call( - requests.get, - [urlbase], - {"params": params}, - tries=4, - delay=1, - ) - except (requests.ConnectionError, RetryError): - got_seq_table = False - else: - if not resp.ok: - got_seq_table = False - if not got_seq_table: - # We weren't able to get a valid sequence table for some - # reason, so we cannot check for -109 data, but we proceed - # with the rest of the review regardless - mylog.warning(warn) - return None - tab_seq = ascii.read(resp.text, header_start=0, data_start=2) - app_exp = np.zeros_like(cnt_rate) - seq_nums = tab_seq["SEQ_NUM"].data.astype("str") - for i, row in enumerate(tab): - sn = str(row["SEQ_NUM"]) - if sn == " ": - continue - j = np.where(sn == seq_nums)[0] - app_exp[i] += np.float64(tab_seq["APP_EXP"][j]).sum() - app_exp *= 1000.0 - table_dict = { - "obsid": np.array(obsids), - "grating": tab["GRAT"].data, - "cnt_rate": cnt_rate, - "app_exp": app_exp, - "spectra_max_count": tab["SPECTRA_MAX_COUNT"].data.astype("int"), - "obs_cycle": tab["OBS_CYCLE"].data, - } - else: - # We weren't able to get a valid table for some reason, so - # we cannot check for hot observations, but we proceed with the - # rest of the review regardless - mylog.warning(warn) - table_dict = None - return table_dict - - -def find_obsid_intervals(cmd_states, load_start): - """ - User reads the SKA commanded states archive, via - a call to the SKA kadi.commands.states.get_states, - between the user specified START and STOP times. - - Problem is, ALL commanded states that were stored - in the archive will be returned. So then you call: - - find_obsid_intervals(cmd_states) - - And this will find the obsid intervals. - What this program does is to extract the time interval for - each OBSID. Said interval start is defined by a - startScience comment, and the interval end is - defined by the first stopScience command that follows. - - When the interval has been found, - a dict element is created from the value of - states data at the time point of the first NPNT - line seen - *minus* the trans_keys, tstart and tstop - times. The values of datestart and datestop are - the XTZ00000 and AA000000 times. This dict - is appended to a Master list of all obsid intervals - and this list is returned. - - Notes: The obsid filtering method includes the - configuration from the last OBSID, through - a setup for the present OBSID, through the - XTZ - AA000, down to the power down. - - - This might show a cooling from the - last config, temp changes due to some - possible maneuvering, past shutdown - """ - # - # Some inits - # - - # a little initialization - firstpow = False - xtztime = None - - # EXTRACTING THE OBSERVATIONS - # - # Find the first line with a WSPOW00000 in it. This is the start of - # the interval. Then get the first XTZ line, the NPNT line, the - # AA000000 line, and lastly the next WSPOW00000 line. - # This constitutes one observation. - - obsid_interval_list = [] - - for eachstate in cmd_states: - # Make sure we skip maneuver obsids explicitly - if 60000 > eachstate["obsid"] >= 38001: - continue - - # Only check states which are at least partially - # within the load being reviewed - if eachstate["tstop"] < load_start: - continue - - pow_cmd = eachstate["power_cmd"] - - # is this the first WSPOW of the interval? - if pow_cmd in ["WSPOW00000", "WSVIDALLDN"] and not firstpow: - firstpow = True - datestart = eachstate["datestart"] - tstart = eachstate["tstart"] - - # Process the power command which turns things on - if pow_cmd.startswith("WSPOW") and pow_cmd != "WSPOW00000" and firstpow: - ccds = decode_power(pow_cmd)["ccds"].replace(" ", ",")[:-1] - - # Process the first XTZ0000005 line you see - if pow_cmd in ["XTZ0000005", "XCZ0000005"] and (xtztime is None and firstpow): - xtztime = eachstate["tstart"] - # MUST fix the instrument now - instrument = who_in_fp(eachstate["simpos"]) - ccd_count = eachstate["ccd_count"] - - # Process the first AA00000000 line you see - if pow_cmd == "AA00000000" and firstpow: - datestop = eachstate["datestop"] - tstop = eachstate["tstop"] - - # now calculate the exposure time - if xtztime is not None: - # Having found the startScience and stopScience, you have an - # OBSID interval. Now form the element and append it to - # the Master List. We add the text version of who is in - # the focal plane - - obsid_dict = { - "datestart": datestart, - "datestop": datestop, - "tstart": tstart, - "tstop": tstop, - "ccds": ccds, - "start_science": xtztime, - "obsid": eachstate["obsid"], - "instrument": instrument, - "ccd_count": ccd_count, - } - obsid_interval_list.append(obsid_dict) - - # now clear out the data values - firstpow = False - xtztime = None - - # End of LOOP for eachstate in cmd_states: - - # sort based on obsid - obsid_interval_list.sort(key=lambda x: x["obsid"]) - # Now we add the stuff we get from ocat_data - obsids = [e["obsid"] for e in obsid_interval_list] - ocat_data = fetch_ocat_data(obsids) - # For a vehicle load, or if the connection to the ocat server - # fails (rare), we will get no data from the ocat so we only - # add this info if we need to. In the case of a failed connection - # to the OCAT server, -109 C checks should be done by hand - if ocat_data is not None: - ocat_keys = list(ocat_data.keys()) - ocat_keys.remove("obsid") - for i, obsid in enumerate(obsids): - # The obscat doesn't have info for cold ECS observations - if obsid > 60000: - continue - for key in ocat_keys: - obsid_interval_list[i][key] = ocat_data[key][i] - - # re-sort based on tstart - obsid_interval_list.sort(key=lambda x: x["tstart"]) - return obsid_interval_list - - -def hrc_science_obs_filter(obsid_interval_list): - """ - This method will filter *OUT* any HRC science observations from the - input obsid interval list. Filtered are obs that have either - HRC-I" or HRC-S" as the science instrument, AND an obsid LESS THAN - 50,000 - """ - acis_and_ecs_only = [] - for eachobservation in obsid_interval_list: - if ( - eachobservation["instrument"].startswith("ACIS-") - or eachobservation["obsid"] >= 60000 - ): - acis_and_ecs_only.append(eachobservation) - return acis_and_ecs_only - - -def acis_filter(obsid_interval_list): - """ - This method will filter between the different types of - ACIS observations: ACIS-I, ACIS-S, "hot" ACIS, and - cold science-orbit ECS. - """ - acis_hot = [] - acis_s = [] - acis_i = [] - cold_ecs = [] - - # This is the approximate date after which we start applying new - # rules for going to hotter temperatures - new_hot_start = CxoTime("2022:318:00:00:00").secs - - mylog.debug("OBSID\tCNT_RATE\tAPP_EXP\tNUM_CTS\tGRATING\tCCDS\tSPEC_MAX_CNT\tCYCLE") - for eachobs in obsid_interval_list: - # First we check that we got ocat data using "grating" - hot_acis = False - if "grating" in eachobs: - eachobs["num_counts"] = int(eachobs["cnt_rate"] * eachobs["app_exp"]) - # First check to see if this is an S3 observation - mylog.debug( - f"{eachobs['obsid']}\t{eachobs['cnt_rate']}\t" - f"{eachobs['app_exp']*1.0e-3}\t{eachobs['num_counts']}\t" - f"{eachobs['grating']}\t{eachobs['ccds']}\t" - f"{eachobs['spectra_max_count']}\t{eachobs['obs_cycle']}", - ) - low_ct = False - # "New" hot ACIS category: - # 1. Cycle 23 and later - # 2. spectra_max_count must be greater than 0 - # 3. Start time must be after ~NOV1422 load - if ( - eachobs["obs_cycle"] >= 23 - and eachobs["spectra_max_count"] > 0 - and eachobs["tstart"] > new_hot_start - ): - if eachobs["instrument"] == "ACIS-I": - low_ct = eachobs["spectra_max_count"] <= 1000 - elif eachobs["instrument"] == "ACIS-S": - low_ct = eachobs["spectra_max_count"] <= 2000 - else: - # otherwise, fall back to modified "old" criterion - # of less than 300 total expected counts - low_ct = eachobs["num_counts"] < 300 - # Also check grating status - hot_acis = (eachobs["grating"] == "HETG") or low_ct - eachobs["hot_acis"] = hot_acis - if hot_acis: - acis_hot.append(eachobs) - else: - if eachobs["instrument"] == "ACIS-S": - acis_s.append(eachobs) - elif eachobs["instrument"] == "ACIS-I": - acis_i.append(eachobs) - elif eachobs["instrument"] == "HRC-S" and eachobs["obsid"] >= 60000: - cold_ecs.append(eachobs) - else: - raise RuntimeError( - f"Cannot determine what kind of thermal " - f"limit {eachobs['obsid']} should have!", - ) - return acis_i, acis_s, acis_hot, cold_ecs diff --git a/acis_thermal_check/apps/acisfp_check.py b/acis_thermal_check/apps/acisfp_check.py index 36da62c..b4fcb30 100755 --- a/acis_thermal_check/apps/acisfp_check.py +++ b/acis_thermal_check/apps/acisfp_check.py @@ -14,21 +14,13 @@ import sys import matplotlib +import numpy as np from astropy.table import Table +from chandra_limits import ACISFPLimit from cxotime import CxoTime from ska_matplotlib import cxctime2plotdate from acis_thermal_check import ACISThermalCheck, get_options, mylog - -# -# Import ACIS-specific observation extraction, filtering -# and attribute support routines. -# -from acis_thermal_check.acis_obs import ( - acis_filter, - find_obsid_intervals, - hrc_science_obs_filter, -) from acis_thermal_check.utils import PredictPlot, paint_perigee # Matplotlib setup @@ -37,17 +29,11 @@ class ACISFPCheck(ACISThermalCheck): + _limit_class = ACISFPLimit + def __init__(self): - valid_limits = {"PITCH": [(1, 3.0), (99, 3.0)], "TSCPOS": [(1, 2.5), (99, 2.5)]} + valid_limits = [(1, 2.0), (50, 1.0), (99, 2.0)] hist_limit = [(-120.0, -100.0)] - limits_map = { - "planning.data_quality.high.acisi": "acis_i", - "planning.data_quality.high.aciss": "acis_s", - "planning.data_quality.high.aciss_hot": "acis_hot", - "planning.data_quality.high.cold_ecs": "cold_ecs", - "planning.warning.high": "planning_hi", - "safety.caution.high": "yellow_hi", - } super().__init__( "fptemp", "acisfp", @@ -55,12 +41,10 @@ def __init__(self): hist_limit, other_telem=["1dahtbon"], other_map={"1dahtbon": "dh_heater", "fptemp_11": "fptemp"}, - limits_map=limits_map, ) # Create an empty observation list which will hold the results. This # list contains all ACIS and all ECS observations. self.acis_and_ecs_obs = [] - self.acis_hot_obs = [] def _calc_model_supp(self, model, state_times, states, ephem, state0): """ @@ -114,7 +98,9 @@ def _calc_model_supp(self, model, state_times, states, ephem, state0): model.comp["1cbat"].set_data(-53.0) model.comp["sim_px"].set_data(-120.0) - def make_prediction_plots(self, outdir, states, temps, load_start): + def make_prediction_plots( + self, outdir, states, temps, load_start, upper_limit, lower_limit + ): """ Make plots of the thermal prediction as well as associated commanded states. @@ -154,7 +140,7 @@ def make_prediction_plots(self, outdir, states, temps, load_start): w1 = None # Make plots of FPTEMP and pitch vs time, looping over # three different temperature ranges - ylim = [(-120, -79), (-120, -119), (-120.0, -107.5)] + ylim = [(-120, -79), (-120, -119), (-120.0, -103.5)] ypos = [-110.0, -119.35, -116] capwidth = [2.0, 0.1, 0.4] textypos = [-108.0, -119.3, -115.7] @@ -178,18 +164,17 @@ def make_prediction_plots(self, outdir, states, temps, load_start): load_start=load_start, ) plots[name].ax.set_title(self.msid.upper(), loc="left", pad=10) - # Draw a horizontal line indicating cold ECS cutoff - plots[name].add_limit_line(self.limits["cold_ecs"], "Cold ECS", ls="--") - # Draw a horizontal line showing the ACIS-I cutoff - plots[name].add_limit_line(self.limits["acis_i"], "ACIS-I", ls="--") - # Draw a horizontal line showing the ACIS-S cutoff - plots[name].add_limit_line(self.limits["acis_s"], "ACIS-S", ls="--") - # Draw a horizontal line showing the hot ACIS-S cutoff - plots[name].add_limit_line(self.limits["acis_hot"], "Hot ACIS-S", ls="--") - # Draw a horizontal line showing the planning warning limit - plots[name].add_limit_line(self.limits["planning_hi"], "Planning", ls="-") - # Draw a horizontal line showing the safety caution limit - plots[name].add_limit_line(self.limits["yellow_hi"], "Yellow", ls="-") + # Draw the planning limit line on the plot (broken up + # according to condition) + upper_limit.plot( + fig_ax=(plots[name].fig, plots[name].ax), + lw=3, + zorder=2, + use_colors=True, + show_changes=False, + ) + # Draw the yellow limit line on the plot + plots[name].add_limit_line(self.limits["yellow_hi"], lw=3) # Get the width of this plot to make the widths of all the # prediction plots the same if i == 0: @@ -243,17 +228,6 @@ def make_prediction_plots(self, outdir, states, temps, load_start): capthick=2, label="ACIS-S", ) - plots[name].ax.errorbar( - [0.0, 0.0], - [1.0, 1.0], - xerr=1.0, - lw=2, - xlolims=True, - color="saddlebrown", - capsize=4, - capthick=2, - label="Hot ACIS-S", - ) # Make the legend on the temperature plot plots[name].ax.legend( @@ -309,189 +283,27 @@ def make_prediction_viols(self, temps, states, load_start): - science_viols """ - # extract the OBSID's from the commanded states. NOTE: this contains all - # observations including ECS runs and HRC observations - observation_intervals = find_obsid_intervals(states, load_start) - - # Filter out any HRC science observations BUT keep ACIS ECS observations - self.acis_and_ecs_obs = hrc_science_obs_filter(observation_intervals) - - times = self.predict_model.times - - mylog.info( - f"MAKE VIOLS Checking for limit violations in " - f"{len(self.acis_and_ecs_obs)} total science observations", + # Extract the prediction violations and the limit objects + viols, upper_limit, lower_limit = super().make_prediction_viols( + temps, states, load_start ) - viols = {} - - # ------------------------------------------------------ - # Create subsets of all the observations - # ------------------------------------------------------ - # Now divide out observations by ACIS-S and ACIS-I - ACIS_I_obs, ACIS_S_obs, ACIS_hot_obs, sci_ecs_obs = acis_filter( - self.acis_and_ecs_obs, - ) - - temp = temps[self.name] - - # --------------------------------------------------------------- - # Planning - Collect any -84 C violations. These are load killers - # --------------------------------------------------------------- - - hi_viols = self._make_prediction_viols( - times, - temp, - load_start, - self.limits["planning_hi"].value, - "planning", - "max", - ) - viols = { - "hi": { - "name": f"Planning High ({self.limits['planning_hi'].value} C)", - "type": "Max", - "values": hi_viols, - }, - } - - acis_i_limit = self.limits["acis_i"].value - acis_s_limit = self.limits["acis_s"].value - acis_hot_limit = self.limits["acis_hot"].value - cold_ecs_limit = self.limits["cold_ecs"].value - - # ------------------------------------------------------------ - # ACIS-I - Collect any -112 C violations of any non-ECS ACIS-I - # science run. These are load killers - # ------------------------------------------------------------ - - mylog.info(f"ACIS-I Science ({acis_i_limit} C) violations") - - # Create the violation data structure. - acis_i_viols = self.search_obsids_for_viols( - "ACIS-I", - acis_i_limit, - ACIS_I_obs, - temp, - times, - load_start, - ) - - viols["ACIS_I"] = { - "name": f"ACIS-I ({acis_i_limit} C)", - "type": "Max", - "values": acis_i_viols, - } - - # ------------------------------------------------------------ - # ACIS-S - Collect any -111 C violations of any non-ECS ACIS-S - # science run. These are load killers - # ------------------------------------------------------------ - - mylog.info(f"ACIS-S Science ({acis_s_limit} C) violations") - - acis_s_viols = self.search_obsids_for_viols( - "ACIS-S", - acis_s_limit, - ACIS_S_obs, - temp, - times, - load_start, - ) - viols["ACIS_S"] = { - "name": f"ACIS-S ({acis_s_limit} C)", - "type": "Max", - "values": acis_s_viols, - } - - # ------------------------------------------------------------ - # ACIS-S Hot - Collect any -109 C violations of any non-ECS ACIS-S - # science run which can run hot. These are load killers - # ------------------------------------------------------------ - # - mylog.info(f"ACIS-S Science ({acis_hot_limit} C) violations") - - acis_hot_viols = self.search_obsids_for_viols( - "Hot ACIS-S", - acis_hot_limit, - ACIS_hot_obs, - temp, - times, - load_start, - ) - viols["ACIS_S_hot"] = { - "name": f"ACIS-S Hot ({acis_hot_limit} C)", - "type": "Max", - "values": acis_hot_viols, - } - - # ------------------------------------------------------------ - # Science Orbit ECS -119.5 violations; -119.5 violation check - # ------------------------------------------------------------ - mylog.info(f"Science Orbit ECS ({cold_ecs_limit} C) violations") - - ecs_viols = self.search_obsids_for_viols( - "Science Orbit ECS", - cold_ecs_limit, - sci_ecs_obs, - temp, - times, - load_start, - ) - - viols["ecs"] = { - "name": f"Science Orbit ECS ({cold_ecs_limit} C)", - "type": "Min", - "values": ecs_viols, - } - - # Store all obsids which can go to -109 C - for obs in ACIS_hot_obs: - self.acis_hot_obs.append(obs) - - return viols - - def search_obsids_for_viols( - self, - limit_name, - limit, - observations, - temp, - times, - load_start, - ): - """ - Given a planning limit and a list of observations, find those time intervals - where the temp gets warmer than the planning limit and identify which - observations (if any) include part or all of those intervals. - """ - viols_list = [] - - # Run through all observations - for eachobs in observations: - # Get the observation start science and stop science times, and obsid - obs_tstart = eachobs["start_science"] - obs_tstop = eachobs["tstop"] - # If the observation is in this load, let's look at it - if obs_tstart > load_start: - idxs = (times >= obs_tstart) & (times <= obs_tstop) - viols = self._make_prediction_viols( - times[idxs], - temp[idxs], - load_start, - limit, - limit_name, - "max", - ) - # If we have flagged any violations, record the obsid for each - # and add them to the list - if len(viols) > 0: - for viol in viols: - viol["obsid"] = str(eachobs["obsid"]) - viols_list += viols - - # Finished - return the violations list - return viols_list + # Store the obsid table + obs_table = self.limit_object.acis_obs_info.as_table() + # use only the obsids after the load start + idxs = np.where(CxoTime(obs_table["start_science"]).secs > load_start)[0] + self.acis_and_ecs_obs = obs_table[idxs] + # for each violation, add the exposure time to the violation + # so we can record it on the page + for v in viols["hi"]: + idx = np.where(self.acis_and_ecs_obs["obsid"] == v["obsid"])[0] + if idx.size == 0: + continue + row = self.acis_and_ecs_obs[idx[0]] + start_science = CxoTime(row["start_science"]).secs + row["bias_time"] + stop_science = CxoTime(row["stop_science"]).secs + v["exp_time"] = (stop_science - start_science) * 1.0e-3 + return viols, upper_limit, lower_limit def write_temps(self, outdir, times, temps): """ @@ -557,7 +369,6 @@ def draw_obsids( obsid = eachobservation["obsid"] in_fp = eachobservation["instrument"] - hot_acis = eachobservation["hot_acis"] if obsid > 60000: # ECS observations during the science orbit are colored blue @@ -568,7 +379,7 @@ def draw_obsids( if in_fp == "ACIS-I": color = "red" else: - color = "saddlebrown" if hot_acis else "green" + color = "green" obsid_txt = str(obsid) # If this is an ECS measurement in the science orbit mark @@ -577,8 +388,10 @@ def draw_obsids( obsid_txt += " (ECS)" # Convert the start and stop times into the Ska-required format - obs_start = cxctime2plotdate([eachobservation["tstart"]]) - obs_stop = cxctime2plotdate([eachobservation["tstop"]]) + tstart, tstop = CxoTime( + [eachobservation["start_science"], eachobservation["stop_science"]], + ) + obs_start, obs_stop = cxctime2plotdate([tstart, tstop]) if in_fp.startswith("ACIS-") or obsid > 60000: # For each ACIS Obsid, draw a horizontal line to show diff --git a/acis_thermal_check/apps/bep_pcb_check.py b/acis_thermal_check/apps/bep_pcb_check.py index d161aca..fae6f68 100755 --- a/acis_thermal_check/apps/bep_pcb_check.py +++ b/acis_thermal_check/apps/bep_pcb_check.py @@ -13,6 +13,7 @@ import sys import matplotlib +from chandra_limits import BEPPCBLimit from acis_thermal_check import DPABoardTempCheck, get_options @@ -23,12 +24,10 @@ class BEPPCBCheck(DPABoardTempCheck): + _limit_class = BEPPCBLimit + def __init__(self): - valid_limits = { - "TMP_BEP_PCB": [(1, 2.0), (50, 1.0), (99, 2.0)], - "PITCH": [(1, 3.0), (99, 3.0)], - "TSCPOS": [(1, 2.5), (99, 2.5)], - } + valid_limits = [(1, 2.0), (50, 1.0), (99, 2.0)] hist_limit = [20.0, 20.0] # First limit is >=, second limit is <= super().__init__("tmp_bep_pcb", "bep_pcb", valid_limits, hist_limit) diff --git a/acis_thermal_check/apps/cea_check.py b/acis_thermal_check/apps/cea_check.py index 864fbfb..bb21d83 100755 --- a/acis_thermal_check/apps/cea_check.py +++ b/acis_thermal_check/apps/cea_check.py @@ -14,9 +14,10 @@ import sys import matplotlib +from chandra_limits import CEALimit from ska_matplotlib import pointpair -from acis_thermal_check import ACISThermalCheck, get_options, mylog +from acis_thermal_check import ACISThermalCheck, get_options from acis_thermal_check.utils import PredictPlot # Matplotlib setup @@ -25,67 +26,21 @@ class CEACheck(ACISThermalCheck): + _limit_class = CEALimit + def __init__(self): - valid_limits = { - "2CEAHVPT": [(1, 2.0), (50, 1.0), (99, 2.0)], - "PITCH": [(1, 3.0), (99, 3.0)], - "TSCPOS": [(1, 2.5), (99, 2.5)], - } + valid_limits = [(1, 2.0), (50, 1.0), (99, 2.0)] hist_limit = [5.0] - limits_map = {} other_telem = ["2imonst", "2sponst", "2s2onst", "1dahtbon"] super().__init__( "2ceahvpt", "cea", valid_limits, hist_limit, - limits_map=limits_map, other_telem=other_telem, other_map={"1dahtbon": "dh_heater"}, ) - def make_prediction_viols(self, temps, states, load_start): - """ - Find limit violations where predicted temperature is above the - specified limits. - - Parameters - ---------- - temps : dict of NumPy arrays - NumPy arrays corresponding to the modeled temperatures - states : NumPy record array - Commanded states - load_start : float - The start time of the load, used so that we only report - violations for times later than this time for the model - run. - """ - mylog.info("Checking for limit violations") - - temp = temps[self.name] - times = self.predict_model.times - - # Only check this violation when HRC is on - mask = self.predict_model.comp["2imonst_on"].dvals - mask |= self.predict_model.comp["2sponst_on"].dvals - hi_viols = self._make_prediction_viols( - times, - temp, - load_start, - self.limits["planning_hi"].value, - "planning", - "max", - mask=mask, - ) - viols = { - "hi": { - "name": f"Hot ({self.limits['planning_hi'].value} C)", - "type": "Max", - "values": hi_viols, - }, - } - return viols - def _calc_model_supp(self, model, state_times, states, ephem, state0): """ Update to initialize the cea0 pseudo-node. If 2ceahvpt diff --git a/acis_thermal_check/apps/dea_check.py b/acis_thermal_check/apps/dea_check.py index 69c61b1..d8df9cb 100755 --- a/acis_thermal_check/apps/dea_check.py +++ b/acis_thermal_check/apps/dea_check.py @@ -15,6 +15,7 @@ import sys import matplotlib +from chandra_limits import DEALimit from acis_thermal_check import ACISThermalCheck, get_options @@ -25,12 +26,10 @@ class DEACheck(ACISThermalCheck): + _limit_class = DEALimit + def __init__(self): - valid_limits = { - "1DEAMZT": [(1, 2.0), (50, 1.0), (99, 2.0)], - "PITCH": [(1, 3.0), (99, 3.0)], - "TSCPOS": [(1, 2.5), (99, 2.5)], - } + valid_limits = [(1, 2.0), (50, 1.0), (99, 2.0)] hist_limit = [20.0] super().__init__("1deamzt", "dea", valid_limits, hist_limit) diff --git a/acis_thermal_check/apps/dpa_check.py b/acis_thermal_check/apps/dpa_check.py index 8a5ce0b..4da3e1d 100755 --- a/acis_thermal_check/apps/dpa_check.py +++ b/acis_thermal_check/apps/dpa_check.py @@ -13,6 +13,7 @@ import sys import matplotlib +from chandra_limits import DPALimit from acis_thermal_check import ACISThermalCheck, get_options @@ -22,88 +23,17 @@ class DPACheck(ACISThermalCheck): + _limit_class = DPALimit + _flag_cold_viols = True + def __init__(self): - valid_limits = { - "1DPAMZT": [(1, 2.0), (50, 1.0), (99, 2.0)], - "PITCH": [(1, 3.0), (99, 3.0)], - "TSCPOS": [(1, 2.5), (99, 2.5)], - } + valid_limits = [(1, 2.0), (50, 1.0), (99, 2.0)] hist_limit = [20.0] - limits_map = {"planning.caution.low": "zero_feps"} super().__init__( "1dpamzt", "dpa", valid_limits, hist_limit, - limits_map=limits_map, - ) - - def custom_prediction_viols(self, times, temp, viols, load_start): - """ - Custom handling of limit violations. This is for checking the - +12 degC violation if all FEPs are off. - - Parameters - ---------- - times : NumPy array - The times for the predicted temperatures - temp : NumPy array - The predicted temperatures - viols : dict - Dictionary of violations information to add to - load_start : float - The start time of the load, used so that we only report - violations for times later than this time for the model - run. - """ - # Only check this violation when all FEPs are off - mask = self.predict_model.comp["fep_count"].dvals == 0 - zf_viols = self._make_prediction_viols( - times, - temp, - load_start, - self.limits["zero_feps"].value, - "zero-feps", - "min", - mask=mask, - ) - viols["zero_feps"] = { - "name": f"Zero FEPs ({self.limits['zero_feps'].value} C)", - "type": "Min", - "values": zf_viols, - } - - def custom_prediction_plots(self, plots): - """ - Customization of prediction plots. - - Parameters - ---------- - plots : dict of dicts - Contains the hooks to the plot figures, axes, and filenames - and can be used to customize plots before they are written, - e.g. add limit lines, etc. - """ - plots[self.name].add_limit_line(self.limits["zero_feps"], "Zero FEPs", ls="--") - - def custom_validation_plots(self, plots): - """ - Customization of validation plots. - - Parameters - ---------- - plots : dict of dicts - Contains the hooks to the plot figures, axes, and filenames - and can be used to customize plots before they are written, - e.g. add limit lines, etc. - """ - plots["1dpamzt"]["lines"]["ax"].axhline( - self.limits["zero_feps"].value, - linestyle="--", - zorder=-8, - color=self.limits["zero_feps"].color, - linewidth=2, - label="Zero FEPs", ) def _calc_model_supp(self, model, state_times, states, ephem, state0): @@ -112,7 +42,7 @@ def _calc_model_supp(self, model, state_times, states, ephem, state0): has an initial value (T_dpa) - which it does at prediction time (gets it from state0), then T_dpa0 is set to that. If we are running the validation, - T_dpa is set to None so we use the dvals in model.comp + T_dpa is set to None, so we use the dvals in model.comp NOTE: If you change the name of the dpa0 pseudo node you have to edit the new name into the if statement diff --git a/acis_thermal_check/apps/fep1_actel_check.py b/acis_thermal_check/apps/fep1_actel_check.py index c302e6f..04fc4fe 100755 --- a/acis_thermal_check/apps/fep1_actel_check.py +++ b/acis_thermal_check/apps/fep1_actel_check.py @@ -14,6 +14,7 @@ import sys import matplotlib +from chandra_limits import FEP1ActelLimit from acis_thermal_check import DPABoardTempCheck, get_options @@ -23,12 +24,10 @@ class FEP1ActelCheck(DPABoardTempCheck): + _limit_class = FEP1ActelLimit + def __init__(self): - valid_limits = { - "TMP_FEP1_ACTEL": [(1, 2.0), (50, 1.0), (99, 2.0)], - "PITCH": [(1, 3.0), (99, 3.0)], - "TSCPOS": [(1, 2.5), (99, 2.5)], - } + valid_limits = [(1, 2.0), (50, 1.0), (99, 2.0)] hist_limit = [25.0, 20.0] # First limit is >=, second limit is <= super().__init__("tmp_fep1_actel", "fep1_actel", valid_limits, hist_limit) diff --git a/acis_thermal_check/apps/fep1_mong_check.py b/acis_thermal_check/apps/fep1_mong_check.py index 495a5ed..91de513 100755 --- a/acis_thermal_check/apps/fep1_mong_check.py +++ b/acis_thermal_check/apps/fep1_mong_check.py @@ -14,6 +14,7 @@ import sys import matplotlib +from chandra_limits import FEP1MongLimit from acis_thermal_check import DPABoardTempCheck, get_options @@ -24,12 +25,10 @@ class FEP1MongCheck(DPABoardTempCheck): + _limit_class = FEP1MongLimit + def __init__(self): - valid_limits = { - "TMP_FEP1_MONG": [(1, 2.0), (50, 1.0), (99, 2.0)], - "PITCH": [(1, 3.0), (99, 3.0)], - "TSCPOS": [(1, 2.5), (99, 2.5)], - } + valid_limits = [(1, 2.0), (50, 1.0), (99, 2.0)] hist_limit = [25.0, 20.0] # First limit is >=, second limit is <= super().__init__("tmp_fep1_mong", "fep1_mong", valid_limits, hist_limit) diff --git a/acis_thermal_check/apps/psmc_check.py b/acis_thermal_check/apps/psmc_check.py index 32b0b86..2e042af 100755 --- a/acis_thermal_check/apps/psmc_check.py +++ b/acis_thermal_check/apps/psmc_check.py @@ -13,6 +13,7 @@ import sys import matplotlib +from chandra_limits import PSMCLimit from acis_thermal_check import ACISThermalCheck, get_options @@ -23,12 +24,10 @@ class PSMCCheck(ACISThermalCheck): + _limit_class = PSMCLimit + def __init__(self): - valid_limits = { - "1PDEAAT": [(1, 2.5), (50, 1.0), (99, 5.5)], - "PITCH": [(1, 3.0), (99, 3.0)], - "TSCPOS": [(1, 2.5), (99, 2.5)], - } + valid_limits = [(1, 2.5), (50, 1.0), (99, 5.5)] hist_limit = [30.0, 40.0] super().__init__( "1pdeaat", diff --git a/acis_thermal_check/main.py b/acis_thermal_check/main.py index 3381533..4ad9375 100644 --- a/acis_thermal_check/main.py +++ b/acis_thermal_check/main.py @@ -14,6 +14,7 @@ import ska_numpy from astropy.io import ascii from astropy.table import Table +from chandra_limits import determine_obsid_info from cxotime import CxoTime from kadi import events from ska_matplotlib import cxctime2plotdate, plot_cxctime, pointpair @@ -25,7 +26,6 @@ PredictPlot, calc_pitch_roll, config_logging, - get_acis_limits, make_state_builder, mylog, paint_perigee, @@ -43,6 +43,8 @@ class ACISThermalCheck: + _limit_class = None + _flag_cold_viols = False r""" ACISThermalCheck class for making thermal model predictions and validating past model data against telemetry @@ -90,9 +92,6 @@ class ACISThermalCheck: internally, so this dict is for "extra" ones such as the 0-FEPs +12 C limit for 1DPAMZT. Default: None, meaning no extra limits are specified. - flag_cold_viols : boolean, optional - If set, violations for the lower planning limit will be - checked for and flagged, and hist_ops : list of strings, optional This sets the operations which will be used to create the error histograms, e.g., including only temperatures above @@ -112,30 +111,21 @@ def __init__( hist_limit, other_telem=None, other_map=None, - limits_map=None, - flag_cold_viols=False, hist_ops=None, ): self.msid = msid self.name = name - self.validation_limits = validation_limits + self.validation_limits = { + msid.upper(): validation_limits, + "PITCH": [(1, 3.0), (99, 3.0)], + "TSCPOS": [(1, 2.5), (99, 2.5)], + } self.hist_limit = hist_limit self.other_telem = other_telem self.other_map = other_map - self.limits_map = { - "odb.caution.high": "yellow_hi", - "odb.caution.low": "yellow_lo", - "safety.caution.high": "yellow_hi", - "safety.caution.low": "yellow_lo", - "planning.warning.high": "planning_hi", - "planning.warning.low": "planning_lo", - } - if limits_map is not None: - self.limits_map.update(limits_map) # Initially, the state_builder is set to None, as it will get # set up later self.state_builder = None - self.flag_cold_viols = flag_cold_viols if hist_ops is None: hist_ops = ["greater_equal"] * len(hist_limit) self.hist_ops = hist_ops @@ -168,6 +158,19 @@ def run(self, args, override_limits=None): proc, model_spec = self._setup_proc_and_logger(args) + # This allows one to override the limits for a particular model + # run. THIS SHOULD ONLY BE USED FOR TESTING PURPOSES. + if override_limits is not None: + for k, v in override_limits.items(): + if k in model_spec["limits"][self.msid]: + limit = model_spec["limits"][self.msid][k] + mylog.warning("Replacing %s %.2f with %.2f", k, limit, v) + model_spec["limits"][self.msid][k] = v + + # Set up the limit object and limits + self.limit_object = self._limit_class(model_spec=model_spec, margin=0.0) + self.limits = self.limit_object.limits + # Record the selected state builder in the class attributes # If there is no "state_builder" command line argument assume # kadi @@ -184,17 +187,6 @@ def run(self, args, override_limits=None): # data to a pickle later self.write_pickle = args.run_start is not None - self.limits = get_acis_limits(self.msid, model_spec, limits_map=self.limits_map) - - # This allows one to override the limits for a particular model - # run. THIS SHOULD ONLY BE USED FOR TESTING PURPOSES. - if override_limits is not None: - for k, v in override_limits.items(): - if k in self.limits: - limit = self.limits[k].value - mylog.warning(f"Replacing {k} {limit:.2f} with {v:.2f}") - self.limits[k].value = v - # Determine the start and stop times either from whatever was # stored in state_builder or punt by using NOW and None for # tstart and tstop. @@ -245,7 +237,7 @@ def run(self, args, override_limits=None): plots_validation = defaultdict(lambda: None) if pred["viols"] is not None: - any_viols = sum(len(viol["values"]) for viol in pred["viols"].values()) + any_viols = sum(len(viol) for viol in pred["viols"].values()) else: any_viols = 0 @@ -263,8 +255,6 @@ def run(self, args, override_limits=None): "pred_only": args.pred_only, "plots_validation": plots_validation, } - if self.msid == "fptemp": - context["acis_hot_obs"] = self.acis_hot_obs self.write_index_rst(args.outdir, context) @@ -345,6 +335,13 @@ def make_week_predict(self, tstart, tstop, tlm, T_init, model_spec, outdir): # Get commanded states and set initial temperature states, state0 = self.get_states(tlm, T_init) + # We have the states, and at this point the ACIS FP model + # needs to know the times of the observations to construct + # the limit later. + if self.msid == "fptemp": + obs_list = determine_obsid_info(states) + self.limit_object.set_obs_info(obs_list) + # calc_model actually does the model calculation by running # model-specific code. model = self.calc_model( @@ -369,11 +366,15 @@ def make_week_predict(self, tstart, tstop, tlm, T_init, model_spec, outdir): temps = {self.name: model.comp[self.msid].mvals} # make_prediction_viols determines the violations and prints them out - viols = self.make_prediction_viols(temps, states, tstart) + viols, upper_limit, lower_limit = self.make_prediction_viols( + temps, states, tstart + ) # make_prediction_plots runs the validation of the model # against previous telemetry - plots = self.make_prediction_plots(outdir, states, temps, tstart) + plots = self.make_prediction_plots( + outdir, states, temps, tstart, upper_limit, lower_limit + ) # write_states writes the commanded states to states.dat self.write_states(outdir, states) @@ -496,65 +497,6 @@ def make_validation_viols(self, plots_validation): return viols - def _make_prediction_viols( - self, - times, - temp, - load_start, - limit, - lim_name, - lim_type, - mask=None, - ): - if mask is None: - mask = np.ones_like(temp, dtype="bool") - viols = [] - if lim_type == "min": - bad = (temp <= limit) & mask - elif lim_type == "max": - bad = (temp >= limit) & mask - op = getattr(np, lim_type) - # The NumPy black magic of the next two lines is to figure - # out which time periods have planning limit violations and - # to find the bounding indexes of these times. This will also - # find violations which happen for one discrete time value also. - bad = np.concatenate(([False], bad, [False])) - changes = np.flatnonzero(bad[1:] != bad[:-1]).reshape(-1, 2) - # Now go through the periods where the temperature violates - # the planning limit and flag the duration and maximum of - # the violation - for change in changes: - # Only report violations which occur after the load being - # reviewed starts. - in_load = times[change[0]] > load_start or ( - times[change[0]] < load_start < times[change[1]] - ) - if times[change[0]] > load_start: - tstart = times[change[0]] - else: - tstart = load_start - tstop = times[change[1] - 1] - datestart = CxoTime(tstart).date - datestop = CxoTime(tstop).date - duration = tstop - tstart - # Only count the violation if it's in the load - # and if the duration is more than 10s - if in_load and duration >= 10.0: - viol = { - "datestart": datestart, - "datestop": datestop, - "duration": duration * 1.0e-3, - "extemp": op(temp[change[0] : change[1]]), - } - mylog.info( - f"WARNING: {self.msid} violates {lim_name} limit " - + "of %.2f degC from %s to %s" - % (limit, viol["datestart"], viol["datestop"]), - ) - viols.append(viol) - - return viols - def make_prediction_viols(self, temps, states, load_start): """ Find limit violations where predicted temperature is above the @@ -573,64 +515,33 @@ def make_prediction_viols(self, temps, states, load_start): """ mylog.info("Checking for limit violations") - temp = temps[self.name] - times = self.predict_model.times - - hi_viols = self._make_prediction_viols( - times, - temp, - load_start, - self.limits["planning_hi"].value, - "planning", - "max", + # Get the upper planning limit line + upper_limit = self.limit_object.get_limit_line( + states, + which="high", ) + # Check the violations for the upper planning limit viols = { - "hi": { - "name": f"Hot ({self.limits['planning_hi'].value} C)", - "type": "Max", - "values": hi_viols, - }, + "hi": upper_limit.check_violations( + self.predict_model, + start_time=load_start, + ), } - - if self.flag_cold_viols: - lo_viols = self._make_prediction_viols( - times, - temp, - load_start, - self.limits["planning_lo"].value, - "planning", - "min", + if self._flag_cold_viols: + # Get the lower planning limit line + lower_limit = self.limit_object.get_limit_line( + states, + which="low", ) - viols["lo"] = { - "name": f"Cold ({self.limits['planning_lo'].value} C)", - "type": "Min", - "values": lo_viols, - } - - # Handle any additional violations one wants to check, - # can be overridden by a subclass - self.custom_prediction_viols(times, temp, viols, load_start) - - return viols - - def custom_prediction_viols(self, times, temp, viols, load_start): - """ - This method is here to allow a subclass - to handle its own violations. + # Check the violations for the lower planning limit + viols["lo"] = lower_limit.check_violations( + self.predict_model, + start_time=load_start, + ) + else: + lower_limit = None - Parameters - ---------- - times : NumPy array - The times for the predicted temperatures - temp : NumPy array - The predicted temperatures - viols : dict - Dictionary of violations information to add to - load_start : float - The start time of the load, used so that we only report - violations for times later than this time for the model - run. - """ + return viols, upper_limit, lower_limit def write_states(self, outdir, states): """ @@ -765,7 +676,9 @@ def _make_state_plots(self, plots, num_figs, w1, plot_start, states, load_start) ) plots[plt_name].filename = f"{plt_name}.png" - def make_prediction_plots(self, outdir, states, temps, load_start): + def make_prediction_plots( + self, outdir, states, temps, load_start, upper_limit, lower_limit + ): """ Make plots of the thermal prediction as well as associated commanded states. @@ -809,16 +722,31 @@ def make_prediction_plots(self, outdir, states, temps, load_start): width=w1, load_start=load_start, ) - # Add horizontal lines for the planning and caution limits ymin, ymax = plots[self.name].ax.get_ylim() - ymax = max(self.limits["yellow_hi"].value + 1, ymax) + # Add horizontal lines for yellow limits, if necessary + for key in self.limit_object.alt_names.values(): + if key.startswith("yellow"): + plots[self.name].add_limit_line(self.limits[key]) + ymax = max(self.limits[key]["value"] + 1, ymax) + ymin = min(self.limits[key]["value"] - 1, ymin) + # Plot lines for upper and lower planning limits + upper_limit.plot( + fig_ax=(plots[self.name].fig, plots[self.name].ax), + lw=3, + zorder=2, + use_colors=True, + show_changes=False, + ) + if lower_limit is not None: + lower_limit.plot( + fig_ax=(plots[self.name].fig, plots[self.name].ax), + lw=3, + zorder=2, + use_colors=True, + show_changes=False, + no_label=self.msid != "1dpamzt", + ) plots[self.name].ax.set_title(self.msid.upper(), loc="left", pad=10) - plots[self.name].add_limit_line(self.limits["yellow_hi"], "Yellow") - plots[self.name].add_limit_line(self.limits["planning_hi"], "Planning") - if self.flag_cold_viols: - ymin = min(self.limits["yellow_lo"].value - 1, ymin) - plots[self.name].add_limit_line(self.limits["yellow_lo"], None) - plots[self.name].add_limit_line(self.limits["planning_lo"], None) plots[self.name].ax.set_ylim(ymin, ymax) plots[self.name].filename = self.msid.lower() + ".png" @@ -830,10 +758,6 @@ def make_prediction_plots(self, outdir, states, temps, load_start): plots["default"] = plots[self.name] - # This call allows the specific check tool - # to customize plots after the fact - self.custom_prediction_plots(plots) - # Make the legend on the temperature plot # only now after we've allowed for # customizations @@ -858,18 +782,6 @@ def make_prediction_plots(self, outdir, states, temps, load_start): return plots - def custom_prediction_plots(self, plots): - """ - Customization of prediction plots. - - Parameters - ---------- - plots : dict of dicts - Contains the hooks to the plot figures, axes, and filenames - and can be used to customize plots before they are written, - e.g. add limit lines, etc. - """ - def get_histogram_mask(self, tlm, limits): """ This method determines which values of telemetry @@ -918,6 +830,13 @@ def make_validation_plots(self, tlm, model_spec, outdir): stop = tlm["date"][-1] states = self.state_builder.get_validation_states(start, stop) + # We have the states, and at this point the ACIS FP model + # needs to know the times of the observations to construct + # the limit later. + if self.msid == "fptemp": + obs_list = determine_obsid_info(states) + self.limit_object.set_obs_info(obs_list) + mylog.info("Calculating %s thermal model for validation", self.name.upper()) # Run the thermal model from the beginning of obtained telemetry @@ -926,6 +845,20 @@ def make_validation_plots(self, tlm, model_spec, outdir): self.validate_model = model + # get the upper planning limit + upper_limit = self.limit_object.get_limit_line( + states, + which="high", + ) + if self._flag_cold_viols: + # get the lower planning limit + lower_limit = self.limit_object.get_limit_line( + states, + which="low", + ) + else: + lower_limit = None + # Use an OrderedDict here because we want the plots on the validation # page to appear in this order pred = OrderedDict( @@ -1019,7 +952,7 @@ def make_validation_plots(self, tlm, model_spec, outdir): fmt=".c", zorder=10, ) - ax.set_title(msid.upper() + " validation", loc="left", pad=10) + ax.set_title(msid.upper() + " validation", loc="left", pad=10, fontsize=15) ax.set_xlabel("Date") ax.set_ylabel(labels[msid]) ax.grid() @@ -1029,77 +962,47 @@ def make_validation_plots(self, tlm, model_spec, outdir): ptimes = cxctime2plotdate([rz.tstart, rz.tstop]) for ptime in ptimes: ax.axvline(ptime, ls="--", color="C2", linewidth=2, zorder=2) - # Add horizontal lines for the planning and caution limits - # or the limits for the focal plane model. Make sure we can - # see all of the limits. + # Add lines for all the limits and make sure we can see the + # lines by adjusting ymin/ymax accordingly. if self.msid == msid: ymin, ymax = ax.get_ylim() - if msid == "fptemp": - ax.axhline( - self.limits["cold_ecs"].value, - linestyle="--", - label="Cold ECS", - color=self.limits["cold_ecs"].color, - zorder=2, - linewidth=2, - ) - ax.axhline( - self.limits["acis_i"].value, - linestyle="--", - label="ACIS-I", - color=self.limits["acis_i"].color, - zorder=2, - linewidth=2, - ) - ax.axhline( - self.limits["acis_s"].value, - linestyle="--", - label="ACIS-S", - color=self.limits["acis_s"].color, - zorder=2, - linewidth=2, - ) - ax.axhline( - self.limits["acis_hot"].value, - linestyle="--", - label="Hot ACIS", - color=self.limits["acis_hot"].color, - zorder=2, - linewidth=2, - ) - ymax = max(self.limits["acis_hot"].value + 1, ymax) - else: - ax.axhline( - self.limits["yellow_hi"].value, - linestyle="-", - linewidth=2, - zorder=2, - color=self.limits["yellow_hi"].color, - ) - ax.axhline( - self.limits["planning_hi"].value, - linestyle="-", - linewidth=2, + # Plot the upper planning limit + upper_limit.plot( + fig_ax=(fig, ax), + lw=3, + zorder=2, + use_colors=True, + show_changes=False, + ) + if lower_limit is not None: + # Plot the lower planning limit + lower_limit.plot( + fig_ax=(fig, ax), + lw=3, zorder=2, - color=self.limits["planning_hi"].color, + use_colors=True, + show_changes=False, + no_label=self.msid != "1dpamzt", ) - ymax = max(self.limits["yellow_hi"].value + 1, ymax) - if self.flag_cold_viols: + # Add horizontal lines for yellow limits, if necessary + label = "Yellow" + for key in self.limit_object.alt_names.values(): + if key.startswith("yellow"): ax.axhline( - self.limits["yellow_lo"].value, + self.limits[key]["value"], linestyle="-", linewidth=2, zorder=2, - color=self.limits["yellow_lo"].color, + color=self.limits[key]["color"], + label=label, ) - ax.axhline( - self.limits["planning_lo"].value, - linestyle="-", - linewidth=2, - zorder=2, - color=self.limits["planning_lo"].color, - ) - ymin = min(self.limits["yellow_lo"].value - 1, ymin) + # Set label to None so we don't repeat it + label = None + # ymin and ymax for plots have to be at least + # above/below their yellow limits or the data + # extremes + ymax = max(self.limits[key]["value"] + 1, ymax) + ymin = min(self.limits[key]["value"] - 1, ymin) ax.set_ylim(ymin, ymax) ax.set_xlim(xmin, xmax) @@ -1272,19 +1175,16 @@ def make_validation_plots(self, tlm, model_spec, outdir): fig_id += 1 - # This call allows the specific check tool - # to customize plots after the fact - self.custom_validation_plots(plots) - if self.msid == "fptemp": - anchor = (0.295, 0.99) + anchor = (0.21, 0.99) else: anchor = (0.4, 0.99) plots[self.msid]["lines"]["ax"].legend( bbox_to_anchor=anchor, loc="lower left", - ncol=3, - fontsize=14, + ncol=5 if self.msid == "fptemp" else 3, + fontsize=12 if self.msid == "fptemp" else 14, + columnspacing=1.5, ) # Now write all of the plots after possible @@ -1316,18 +1216,6 @@ def make_validation_plots(self, tlm, model_spec, outdir): return plots - def custom_validation_plots(self, plots): - """ - Customization of prediction plots. - - Parameters - ---------- - plots : dict of dicts - Contains the hooks to the plot figures, axes, and filenames - and can be used to customize plots before they are written, - e.g. add limit lines, etc. - """ - def rst_to_html(self, outdir): """ Render index.rst as HTML @@ -1351,22 +1239,27 @@ def rst_to_html(self, outdir): ) stylesheet_path = str(outdir / "acis_thermal_check.css") - infile = str(outdir / "index.rst") - outfile = str(outdir / "index.html") - publish_file( - source_path=infile, - destination_path=outfile, - writer_name="html", - settings_overrides={"stylesheet_path": stylesheet_path}, - ) + prefixes = ["index"] + if self.msid == "fptemp": + # For the ACIS FP limit we write out an obsid table + prefixes.append("obsid_table") + for prefix in prefixes: + infile = str(outdir / f"{prefix}.rst") + outfile = str(outdir / f"{prefix}.html") + publish_file( + source_path=infile, + destination_path=outfile, + writer_name="html", + settings_overrides={"stylesheet_path": stylesheet_path}, + ) - # Remove the stupid