Skip to content

Commit

Permalink
Merge pull request #80 from cta-observatory/tcu
Browse files Browse the repository at this point in the history
Retrieve source information from TCU database
  • Loading branch information
morcuended authored Jan 17, 2022
2 parents 364cfdc + 4c3340e commit cdc88ca
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 18 deletions.
2 changes: 1 addition & 1 deletion cfg/sequencer.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ rf_models: /data/models/prod5/zenith_20deg/20201023_v0.6.3
[SLURM]
PARTITION_PEDCALIB: short
PARTITION_DATA: long
MEMSIZE_PEDCALIB: 5GB
MEMSIZE_PEDCALIB: 3GB
MEMSIZE_DATA: 16GB

[WEBSERVER]
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies:
- h5py
- joblib
- traitlets=5.0
- pymongo
# dev dependencies
- pre-commit
- pytest
Expand Down
2 changes: 2 additions & 0 deletions osa/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,13 @@ def sequence_list(running_analysis_dir, run_summary, drs4_time_calibration_files
"""Creates a sequence list from a run summary file."""
options.directory = running_analysis_dir
options.simulate = True
options.test = True
for file in drs4_time_calibration_files:
assert file.exists()

subrun_list = extractsubruns(run_summary)
run_list = extractruns(subrun_list)
options.test = False
return extractsequences(run_list)


Expand Down
2 changes: 1 addition & 1 deletion osa/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ def submit_jobs(sequence_list, batch_command="sbatch"):
def run_squeue() -> StringIO:
"""Run squeue command to get the status of the jobs."""
if shutil.which("squeue") is None:
log.warning("No job info available since sacct command is not available")
log.warning("No job info available since squeue command is not available")
return StringIO()

out_fmt = "%i,%j,%T,%M" # JOBID, NAME, STATE, TIME
Expand Down
85 changes: 85 additions & 0 deletions osa/nightsummary/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""Query the TCU database source name and astronomical coordinates."""

from datetime import datetime

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

__all__ = ['query']


def query(obs_id: int, property_name: str):
"""
Query the source name and coordinates from TCU database.
Parameters
----------
obs_id : int
Run number
property_name : str
Properties from drive information e.g. `DriveControl_SourceName`,
`DriveControl_RA_Target`, `DriveControl_Dec_Target`
Returns
-------
query_result : str or None
Query result from database. It can be either the source name or its coordinates.
Raises
------
ConnectionFailure
"""

caco_client = MongoClient("tcs01")
tcu_client = MongoClient("tcs05")

try:
caco_client.admin.command('ping')
tcu_client.admin.command('ping')
except ConnectionFailure:
raise ConnectionFailure("Databases not available")

with caco_client, tcu_client:
run_info = caco_client["CACO"]["RUN_INFORMATION"]
run = run_info.find_one({"run_number": obs_id})

try:
start = datetime.fromisoformat(run["start_time"].replace("Z", ""))
end = datetime.fromisoformat(run["stop_time"].replace("Z", ""))
except TypeError:
return None

bridges_monitoring = tcu_client["bridgesmonitoring"]
property_collection = bridges_monitoring["properties"]
chunk_collection = bridges_monitoring["chunks"]
descriptors = property_collection.find(
{'property_name': property_name},
)

entries = {
'name': property_name,
'time': [],
'value': []
}

for descriptor in descriptors:
query_property = {'pid': descriptor['_id']}

if start is not None:
query_property.update({"begin": {"$gte": start}})

if end is not None:
query_property.update({"end": {"$lte": end}})

chunks = chunk_collection.find(query_property)

for chunk in chunks:
for value in chunk['values']:
entries['time'].append(value['t'])
entries['value'].append(value['val'])

source_name = entries["value"][0]
if source_name != "":
return source_name
else:
return None
23 changes: 19 additions & 4 deletions osa/nightsummary/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
SubrunObj,
)
from osa.job import sequence_calibration_filenames, sequence_filenames
from osa.nightsummary import database
from osa.utils.utils import lstdate_to_iso
from pymongo.errors import ConnectionFailure

log = logging.getLogger(__name__)

Expand All @@ -35,10 +37,9 @@ def extractsubruns(summary_table):
Parameters
----------
summary_table: astropy.Table
Table containing run-wise information indicated in
nightsummary.run_summary.
Table containing run-wise information indicated in `nightsummary.run_summary`.
See Also: nightsummary.run_summary
See Also: `nightsummary.run_summary`
Returns
-------
Expand Down Expand Up @@ -66,16 +67,30 @@ def extractsubruns(summary_table):
# Build run object
sr.runobj = RunObj()
sr.runobj.run_str = f"{run_info['run_id']:05d}"
# FIXME: Leave only .run attribute
sr.runobj.run = run_info["run_id"]
sr.runobj.type = run_info["run_type"]
sr.runobj.telescope = options.tel_id
sr.runobj.night = lstdate_to_iso(options.date)
if not options.test:
sr.runobj.source = database.query(
obs_id=sr.runobj.run,
property_name="DriveControl_SourceName"
)
sr.runobj.source_ra = database.query(
obs_id=sr.runobj.run,
property_name="DriveControl_RA_Target"
)
sr.runobj.source_dec = database.query(
obs_id=sr.runobj.run,
property_name="DriveControl_Dec_Target"
)
run_to_obj[sr.runobj.run] = sr.runobj
except KeyError as err:
log.warning(f"Key error, {err}")
except IndexError as err:
log.warning(f"Index error, {err}")
except ConnectionFailure:
log.warning("MongoDB server not available.")
else:
sr.runobj.subrun_list.append(sr)
sr.runobj.subruns = len(sr.runobj.subrun_list)
Expand Down
8 changes: 8 additions & 0 deletions osa/nightsummary/tests/test_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import pytest
from pymongo.errors import ConnectionFailure


def test_query():
from osa.nightsummary import database
with pytest.raises(ConnectionFailure):
database.query(obs_id=1616, property_name="DriveControl_SourceName")
6 changes: 0 additions & 6 deletions osa/scripts/sequencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,11 @@ def report_sequences(sequence_list):
"Run",
"Subruns",
"Source",
"Wobble",
"Action",
"Tries",
"JobID",
"State",
"Host",
"CPU_time",
"Walltime",
"Exit",
]
if options.tel_id in ["LST1", "LST2"]:
Expand All @@ -313,14 +310,11 @@ def report_sequences(sequence_list):
sequence.run,
sequence.subruns,
sequence.source,
sequence.wobble,
sequence.action,
sequence.tries,
sequence.jobid,
sequence.state,
sequence.jobhost,
sequence.cputime,
sequence.walltime,
sequence.exit,
]
if sequence.type in ["DRS4", "PEDCALIB"]:
Expand Down
1 change: 1 addition & 0 deletions osa/scripts/simulate_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def simulate_subrun_processing(args):
def simulate_processing():
"""Simulate daily processing and capture provenance."""
options.simulate = True
options.test = True
summary_table = run_summary_table(options.date)

sub_run_list = extractsubruns(summary_table)
Expand Down
10 changes: 6 additions & 4 deletions osa/scripts/tests/test_osa_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ def test_simulated_sequencer(drs4_time_calibration_files, run_summary_file):
assert rc.stdout == dedent(
f"""\
================================== Starting sequencer.py at {now} UTC for LST, Telescope: LST1, Night: 2020_01_17 ==================================
Tel Seq Parent Type Run Subruns Source Wobble Action Tries JobID State Host CPU_time Walltime Exit DL1% MUONS% DL1AB% DATACHECK% DL2%
LST1 0 None PEDCALIB 1805 5 None None None None None None None None None None None None None None None
LST1 1 0 DATA 1807 11 None None None None None None None None None None 0 0 0 0 0
LST1 2 0 DATA 1808 9 None None None None None None None None None None 0 0 0 0 0
Tel Seq Parent Type Run Subruns Source Action Tries JobID State CPU_time Exit DL1% MUONS% DL1AB% DATACHECK% DL2%
LST1 0 None PEDCALIB 1805 5 None None None None None None None None None None None None
LST1 1 0 DATA 1807 11 None None None None None None None 0 0 0 0 0
LST1 2 0 DATA 1808 9 None None None None None None None 0 0 0 0 0
""")


Expand Down Expand Up @@ -241,7 +241,9 @@ def test_calibration_pipeline(running_analysis_dir):

def test_is_sequencer_successful(run_summary, running_analysis_dir):
options.directory = running_analysis_dir
options.test = True
seq_tuple = is_finished_check(run_summary)
options.test = False
assert is_sequencer_successful(seq_tuple) is True


Expand Down
4 changes: 2 additions & 2 deletions osa/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_scheduler_env_variables(sequence_list, running_analysis_dir):
'#SBATCH --output=log/slurm_01805.%4a_%A.out',
'#SBATCH --error=log/slurm_01805.%4a_%A.err',
'#SBATCH --partition=short',
'#SBATCH --mem-per-cpu=5GB'
'#SBATCH --mem-per-cpu=3GB'
]
# Extract the second sequence
second_sequence = sequence_list[1]
Expand Down Expand Up @@ -106,7 +106,7 @@ def test_job_header_template(sequence_list, running_analysis_dir):
#SBATCH --output=log/slurm_01805.%4a_%A.out
#SBATCH --error=log/slurm_01805.%4a_%A.err
#SBATCH --partition=short
#SBATCH --mem-per-cpu=5GB""")
#SBATCH --mem-per-cpu=3GB""")
assert header == output_string1

# Extract the second sequence
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"pydot",
"pydotplus",
"psutil",
"pymongo"
],
package_data={
'osa': [
Expand Down

0 comments on commit cdc88ca

Please sign in to comment.