Skip to content

Commit

Permalink
Merge pull request #239 from cta-observatory/calib_prod_id
Browse files Browse the repository at this point in the history
Use CALIB_PROD_ID instead of pro to search for calibration files
  • Loading branch information
morcuended authored Oct 10, 2023
2 parents b8b0c67 + 04e9548 commit fea0067
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 218 deletions.
1 change: 0 additions & 1 deletion src/osa/configs/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
nocheck = None
no_dl2 = None
prod_id = None
calib_prod_id = None
dl1_prod_id = None
dl2_prod_id = None
append = None
Expand Down
1 change: 0 additions & 1 deletion src/osa/configs/sequencer.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ SEQUENCER_WEB_DIR: %(OSA_DIR)s/SequencerWeb
# To be set by the user. Using PROD-ID will overcome the automatic
# fetching of lstchain version. Otherwise leave it empty (and without the colon symbol).
PROD_ID: v0.1.0
CALIB_PROD_ID: v01
# Change this to produce a different DL1b or DL2 sub-productions.
# Otherwise, keep it empty to use the common PROD-ID
DL1_PROD_ID: tailcut84
Expand Down
14 changes: 12 additions & 2 deletions src/osa/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from osa.scripts.tests.test_osa_scripts import run_program
from osa.utils.utils import date_to_dir
from datetime import datetime
import lstchain

date = datetime.fromisoformat("2020-01-17")
nightdir = date_to_dir(date)
Expand Down Expand Up @@ -68,16 +69,25 @@ def calibration_base_dir(monitoring_dir):
return base_dir


@pytest.fixture(scope="session")
def drive_log(monitoring_dir):
drive_dir = monitoring_dir / "DrivePositioning"
drive_file = drive_dir / "DrivePosition_log_20200117.txt"
drive_dir.mkdir(parents=True, exist_ok=True)
drive_file.touch()
return drive_file


@pytest.fixture(scope="session")
def calibration_dir(calibration_base_dir):
directory = calibration_base_dir / "calibration" / nightdir / "pro"
directory = calibration_base_dir / "calibration" / nightdir / f"v{lstchain.__version__}"
directory.mkdir(parents=True, exist_ok=True)
return directory


@pytest.fixture(scope="session")
def drs4_baseline_dir(calibration_base_dir):
directory = calibration_base_dir / "drs4_baseline" / nightdir / "pro"
directory = calibration_base_dir / "drs4_baseline" / nightdir / f"v{lstchain.__version__}"
directory.mkdir(parents=True, exist_ok=True)
return directory

Expand Down
134 changes: 97 additions & 37 deletions src/osa/paths.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
"""Handle the paths of the analysis products."""

import logging
import re
from datetime import datetime
from pathlib import Path
from typing import List
from datetime import datetime

import lstchain
from astropy.table import Table
from lstchain.onsite import find_systematics_correction_file, find_time_calibration_file
from lstchain.scripts.onsite.onsite_create_calibration_file import search_filter
from lstchain.onsite import (find_systematics_correction_file,
find_time_calibration_file)
from lstchain.scripts.onsite.onsite_create_calibration_file import \
search_filter

from osa.configs import options
from osa.configs.config import DEFAULT_CFG
from osa.configs.config import cfg
from osa.configs.config import DEFAULT_CFG, cfg
from osa.configs.datamodel import Sequence
from osa.utils import utils
from osa.utils.logging import myLogger

log = myLogger(logging.getLogger(__name__))

__all__ = [
"get_calibration_file",
"get_drs4_pedestal_file",
"get_calibration_filename",
"get_drs4_pedestal_filename",
"pedestal_ids_file_exists",
"get_run_date",
"drs4_pedestal_exists",
Expand All @@ -40,6 +43,8 @@


DATACHECK_WEB_BASEDIR = Path(cfg.get("WEBSERVER", "DATACHECK"))
CALIB_BASEDIR = Path(cfg.get("LST1", "CALIB_DIR"))
DRS4_PEDESTAL_BASEDIR = Path(cfg.get("LST1", "PEDESTAL_DIR"))


def analysis_path(tel) -> Path:
Expand Down Expand Up @@ -86,18 +91,25 @@ def get_run_date(run_id: int) -> datetime:
return datetime.strptime(date_string, "%Y-%m-%d")


def get_drs4_pedestal_file(run_id: int) -> Path:
def get_drs4_pedestal_filename(run_id: int, prod_id: str) -> Path:
"""
Return the drs4 pedestal file corresponding to a given run id
regardless of the date when the run was taken.
"""
drs4_pedestal_dir = Path(cfg.get("LST1", "PEDESTAL_DIR"))
if drs4_pedestal_exists(run_id, prod_id):
files = search_drs4_files(run_id, prod_id)
return files[-1] # Get the latest production among the major lstchain version

date = utils.date_to_dir(get_run_date(run_id))
file = drs4_pedestal_dir / date / f"pro/drs4_pedestal.Run{run_id:05d}.0000.h5"
return file.resolve()
return (
DRS4_PEDESTAL_BASEDIR
/ date
/ f"v{lstchain.__version__}/drs4_pedestal.Run{run_id:05d}.0000.h5"
).resolve()


def get_calibration_file(run_id: int) -> Path:
def get_calibration_filename(run_id: int, prod_id: str) -> Path:
# sourcery skip: remove-unnecessary-cast
"""
Return the calibration file corresponding to a given run_id.
Expand All @@ -109,14 +121,18 @@ def get_calibration_file(run_id: int) -> Path:
Notes
-----
The file path will be built regardless of the date when the run was taken.
We follow the naming convention of the calibration files produced by the lstchain script
which depends on the filter wheels position. Therefore, we need to try to fetch the filter
position from the CaCo database. If the filter position is not found, we assume the default
filter position 5-2. Filter information is not available in the database for runs taken before
We follow the naming convention of the calibration files produced by the
lstchain script which depends on the filter wheels position. Therefore, we
need to try to fetch the filter position from the CaCo database. If the
filter position is not found, we assume the default filter position 5-2.
Filter information is not available in the database for runs taken before
mid 2021 approx.
"""

calib_dir = Path(cfg.get("LST1", "CALIB_DIR"))
if calibration_file_exists(run_id, prod_id):
files = search_calibration_files(run_id, prod_id)
return files[-1] # Get the latest production among the major lstchain version

date = utils.date_to_dir(get_run_date(run_id))

if options.test: # Run tests avoiding the access to the database
Expand All @@ -131,8 +147,11 @@ def get_calibration_file(run_id: int) -> Path:
log.warning("No filter information found in database. Assuming positions 52.")
options.filters = 52

file = calib_dir / date / f"pro/calibration_filters_{options.filters}.Run{run_id:05d}.0000.h5"
return file.resolve()
return (
CALIB_BASEDIR
/ date
/ f"v{lstchain.__version__}/calibration_filters_{options.filters}.Run{run_id:05d}.0000.h5"
).resolve()


def pedestal_ids_file_exists(run_id: int) -> bool:
Expand All @@ -142,16 +161,51 @@ def pedestal_ids_file_exists(run_id: int) -> bool:
return bool(file_list)


def drs4_pedestal_exists(run_id: int) -> bool:
def drs4_pedestal_exists(run_id: int, prod_id: str) -> bool:
"""Return true if drs4 pedestal file was already produced."""
file = get_drs4_pedestal_file(run_id)
return file.exists()
files = search_drs4_files(run_id, prod_id)

return len(files) != 0


def calibration_file_exists(run_id: int) -> bool:
def calibration_file_exists(run_id: int, prod_id: str) -> bool:
"""Return true if calibration file was already produced."""
file = get_calibration_file(run_id)
return file.exists()
files = search_calibration_files(run_id, prod_id)

return len(files) != 0


def search_drs4_files(run_id: int, prod_id: str) -> list:
"""
Find DRS4 baseline correction files corresponding to a run ID
and major lstchain production version
"""
date = utils.date_to_dir(get_run_date(run_id))
version = get_major_version(prod_id)
drs4_dir = DRS4_PEDESTAL_BASEDIR / date
return sorted(
drs4_dir.glob(f"{version}*/drs4_pedestal.Run{run_id:05d}.0000.h5")
)


def get_major_version(prod_id):
"""Given a version as vX.Y.Z return vX.Y"""
# First check that the given version is in the correct format
if prod_id.startswith("v") and len(prod_id.split(".")) >= 2:
return re.search(r"\D\d+\.\d+", prod_id)[0]

raise ValueError("Format of the version is not in the form vW.X.Y.Z")


def search_calibration_files(run_id: int, prod_id: str) -> list:
"""
Search charge calibration files corresponding to a run ID and major lstchain production version
"""
date = utils.date_to_dir(get_run_date(run_id))
version = get_major_version(prod_id)
return sorted(
(CALIB_BASEDIR / date).glob(f"{version}*/calibration_filters_*.Run{run_id:05d}.0000.h5")
)


def get_drive_file(date: str) -> Path:
Expand Down Expand Up @@ -180,11 +234,16 @@ def sequence_calibration_files(sequence_list: List[Sequence]) -> None:
"""Build names of the calibration files for each sequence in the list."""
flat_date = utils.date_to_dir(options.date)
base_dir = Path(cfg.get("LST1", "BASE"))
prod_id = options.prod_id

for sequence in sequence_list:
# Assign the calibration files to the sequence object
sequence.drs4_file = get_drs4_pedestal_file(sequence.drs4_run)
sequence.calibration_file = get_calibration_file(sequence.pedcal_run)
sequence.drs4_file = get_drs4_pedestal_filename(
sequence.drs4_run, prod_id
)
sequence.calibration_file = get_calibration_filename(
sequence.pedcal_run, prod_id
)
sequence.time_calibration_file = find_time_calibration_file(
"pro", sequence.pedcal_run, base_dir=base_dir
)
Expand Down Expand Up @@ -235,10 +294,7 @@ def destination_dir(concept: str, create_dir: bool = True) -> Path:
directory = Path(cfg.get(options.tel_id, "DL1_DIR")) / nightdir / options.prod_id / "muons"
elif concept == "INTERLEAVED":
directory = (
Path(cfg.get(options.tel_id, "DL1_DIR"))
/ nightdir
/ options.prod_id
/ "interleaved"
Path(cfg.get(options.tel_id, "DL1_DIR")) / nightdir / options.prod_id / "interleaved"
)
elif concept == "DATACHECK":
directory = (
Expand All @@ -257,17 +313,21 @@ def destination_dir(concept: str, create_dir: bool = True) -> Path:
)
elif concept in {"DL2", "DL3"}:
directory = (
Path(cfg.get(options.tel_id, concept + "_DIR"))
/ nightdir
(Path(cfg.get(options.tel_id, f"{concept}_DIR")) / nightdir)
/ options.prod_id
/ options.dl2_prod_id
)
) / options.dl2_prod_id
elif concept in {"PEDESTAL", "CALIB", "TIMECALIB"}:
directory = (
Path(cfg.get(options.tel_id, concept + "_DIR")) / nightdir / options.calib_prod_id
Path(cfg.get(options.tel_id, f"{concept}_DIR"))
/ nightdir
/ options.prod_id
)
elif concept == "HIGH_LEVEL":
directory = Path(cfg.get(options.tel_id, concept + "_DIR")) / nightdir / options.prod_id
directory = (
Path(cfg.get(options.tel_id, f"{concept}_DIR"))
/ nightdir
/ options.prod_id
)
else:
log.warning(f"Concept {concept} not known")
directory = None
Expand Down
Loading

0 comments on commit fea0067

Please sign in to comment.