Skip to content

Commit

Permalink
Merge pull request #11 from cms-DQM/dev
Browse files Browse the repository at this point in the history
  • Loading branch information
nothingface0 authored Mar 12, 2024
2 parents 45086aa + 1e4b9eb commit e08d37c
Show file tree
Hide file tree
Showing 59 changed files with 8,169 additions and 2,444 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
env/
tmp/
log/
27 changes: 27 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "fff_dqmtools local development",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/fff_dqmtools.py",
"console": "integratedTerminal",
"justMyCode": true,
"args": [
"--foreground",
"--applets",
"fff_web,fff_filemonitor,fff_selftest",
"--web.port",
"9325",
"--path",
"/tmp/dqm_monitoring/test",
"--web.db",
":memory:"
]
}
]
}
39 changes: 25 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
fff_dqmtools
============
# `fff_dqmtools`

fff_dqmtools aka DQM^2
`fff_dqmtools` aka DQM^2.

#### Installation
Create a RPM locally executing `makerpm.sh`. Then:
Twiki with more information [here](https://twiki.cern.ch/twiki/bin/viewauth/CMS/DQMOnlineFFFTools).

* for the manual installation at P5 do something `./install.py --remote machine_name` with RPM stored in the same dir. This installation will be reverted by puppet months or days later.
* to update playback DQM machines the dropbox could be used https://twiki.cern.ch/twiki/bin/view/CMS/ClusterUsersGuide#How_to_use_the_dropbox_computer:
```
ssh cmsdropbox.cms
sudo dropbox2 -o cc7 -z cms -s dqm -u folder_with_fff_rpm/
```
## Building a package

* to update production machines please create a JIRA ticket.
> [!NOTE]
> The `makerpm.sh` script will only work on RHEL-like OSes (e.g. an RHEL8 OpenStack VM).
> To run this script you will need to install `rpmdevtools` and `rpmlint`.
* Create a RPM by executing `./utils/makerpm.sh` in the repository's root folder.

## Installation

* For the manual installation at P5, copy the source code and the built package and then execute: `./utils/install.py --remote <machine_name>` in the respository's root folder. This installation will be undone by puppet days/months later.

* To update all of the DQM machines at once, the [dropbox](https://twiki.cern.ch/twiki/bin/view/CMS/ClusterUsersGuide#How_to_use_the_dropbox_computer) could be used (you will need to be in the appropriate `*_librarian` group):
```bash
ssh cmsdropboxcc8.cms
sudo dropbox2 -z cms -o cc8 -s dqm -u /folder/with/fff_dqmtools/
```
> [!INFO]
> Once the operation completes, you will receive a report of what machines were updated.

## DQM^2 worflows

### To update DQM^2 DB with client info

#### DQM^2 worflows:
##### to update DQM^2 DB with client info:
Create JSON report files in:
https://github.com/cms-DQM/fff_dqmtools/blob/a62a1a317e1bc312c704065a43d3bd0aeb5ecc74/applets/analyze_files.py#L100
Then read JSONs in folder:
Expand Down
100 changes: 62 additions & 38 deletions applets/analyze_files.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3

import sys
import os
Expand All @@ -13,26 +13,31 @@
from collections import OrderedDict, namedtuple

import fff_dqmtools
import fff_filemonitor
import applets.fff_filemonitor as fff_filemonitor
import fff_cluster

log = logging.getLogger(__name__)

RunEntry = namedtuple('RunEntry', ["run", "path", "start_time", "start_time_source"])
FileEntry = namedtuple('FileEntry', ["ls", "stream", "mtime", "ctime", "evt_processed", "evt_accepted", "fsize"])
RunEntry = namedtuple("RunEntry", ["run", "path", "start_time", "start_time_source"])
FileEntry = namedtuple(
"FileEntry",
["ls", "stream", "mtime", "ctime", "evt_processed", "evt_accepted", "fsize"],
)

LUMI = 23.310893056


def find_match(re, iter):
xm = map(re.match, iter)
return filter(lambda x: x is not None, xm)


def collect_run_timestamps(path):
lst = os.listdir(path)

path_dct = {}
path_pattern_re = re.compile(r"^run(\d+)$")
for m in find_match(path_pattern_re, lst):
for m in find_match(path_pattern_re, lst):
path_dct[int(m.group(1))] = os.path.join(path, m.group(0))

global_dct = {}
Expand All @@ -48,19 +53,23 @@ def collect_run_timestamps(path):
run_list = []
for run in path_dct.keys():
# runs without global will have 'None'
run_list.append(RunEntry(run, path_dct[run], global_dct.get(run, None), "global_file"))
run_list.append(
RunEntry(run, path_dct[run], global_dct.get(run, None), "global_file")
)

run_list.sort()
return run_list


def analyze_run_entry(e):
lst = os.listdir(e.path)

re_jsn = re.compile(r"^run(?P<run>\d+)_ls(?P<ls>\d+)(?P<leftover>_.+\.jsn)$")
files = []
for m in find_match(re_jsn, lst):
d = m.groupdict()
if int(d['run']) != e.run: continue
if int(d["run"]) != e.run:
continue

f = os.path.join(e.path, m.group(0))

Expand All @@ -74,18 +83,22 @@ def analyze_run_entry(e):
if "EoR" not in f:
try:
with open(f, "r") as fd:
jsn = json.load(fd).get("data", [-1]*5)
evt_processed = long(jsn[0])
evt_accepted = long(jsn[1])
fsize = long(jsn[4])
jsn = json.load(fd).get("data", [-1] * 5)
evt_processed = int(jsn[0])
evt_accepted = int(jsn[1])
fsize = int(jsn[4])
except:
log.warning("Crash while reading %s.", f, exc_info=True)

files.append(FileEntry(int(d['ls']), stream, mtime, ctime, evt_processed, evt_accepted, fsize))
files.append(
FileEntry(
int(d["ls"]), stream, mtime, ctime, evt_processed, evt_accepted, fsize
)
)

files.sort()
if (e.start_time is None) and len(files):
e = e._replace(start_time = files[0].mtime - LUMI, start_time_source = "first_lumi")
e = e._replace(start_time=files[0].mtime - LUMI, start_time_source="first_lumi")

return e, files

Expand All @@ -112,21 +125,24 @@ def make_report(self, backlog=5):
# don't include EoR file
continue

lst = grouped.setdefault(f.stream, {
'lumis': [],
'mtimes': [],
'ctimes': [],
'evt_processed': [],
'evt_accepted': [],
'fsize': [],
})

lst['lumis'].append(f.ls)
lst['mtimes'].append(f.mtime)
lst['ctimes'].append(f.ctime)
lst['evt_processed'].append(f.evt_processed)
lst['evt_accepted'].append(f.evt_accepted)
lst['fsize'].append(f.fsize)
lst = grouped.setdefault(
f.stream,
{
"lumis": [],
"mtimes": [],
"ctimes": [],
"evt_processed": [],
"evt_accepted": [],
"fsize": [],
},
)

lst["lumis"].append(f.ls)
lst["mtimes"].append(f.mtime)
lst["ctimes"].append(f.ctime)
lst["evt_processed"].append(f.evt_processed)
lst["evt_accepted"].append(f.evt_accepted)
lst["fsize"].append(f.fsize)

id = "dqm-files-%s-%s-run%d" % (self.hostname, self.app_tag, entry.run)

Expand All @@ -140,11 +156,11 @@ def make_report(self, backlog=5):
"global_start": entry.start_time,
"global_start_source": entry.start_time_source,
"lumi": LUMI,
#"run_timestamps": run_dct,
# "run_timestamps": run_dct,
},
"pid": os.getpid(),
"_id": id,
"type": "dqm-files"
"type": "dqm-files",
}

final_fp = os.path.join(self.report_directory, doc["_id"] + ".jsn")
Expand All @@ -158,37 +174,45 @@ def run_greenlet(self):
if os.path.isdir(self.report_directory):
self.make_report()
else:
log.warning("Directory %s does not exists. Reports disabled.", self.report_directory)
log.warning(
"Directory %s does not exists. Reports disabled.",
self.report_directory,
)

time.sleep(105)

@fff_cluster.host_wrapper(allow = ["bu-c2f13-31-01", "bu-c2f11-09-01", "bu-c2f11-13-01"])

@fff_cluster.host_wrapper(
allow=["dqmrubu-c2a06-05-01", "dqmrubu-c2a06-01-01", "dqmrubu-c2a06-03-01"]
)
@fff_dqmtools.fork_wrapper(__name__, uid="dqmpro", gid="dqmpro")
@fff_dqmtools.lock_wrapper
def __run__(opts, logger, **kwargs):
global log
log = logger

s = Analyzer(
top = "/fff/ramdisk/",
app_tag = kwargs["name"],
report_directory = opts["path"],
top="/fff/ramdisk/",
app_tag=kwargs["name"],
report_directory=opts["path"],
)

s.run_greenlet()


if __name__ == "__main__":
log = fff_dqmtools.LogCaptureHandler.create_logger_subprocess("root")
path = "/tmp/dqm_monitoring/"

import sys

if len(sys.argv) == 2:
path = sys.argv[1]

s = Analyzer(
top = "/fff/ramdisk/",
app_tag = "analyze_files",
report_directory = path,
top="/fff/ramdisk/",
app_tag="analyze_files",
report_directory=path,
)

s.make_report(backlog=50)
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@
import fff_cluster
import logging

@fff_cluster.host_wrapper(allow = ["bu-c2f11-19-01"])

@fff_cluster.host_wrapper(allow=["dqmrubu-c2a06-05-01"])
@fff_dqmtools.fork_wrapper(__name__, uid="dqmpro", gid="dqmpro")
@fff_dqmtools.lock_wrapper
def __run__(opts, **kwargs):
import analyze_files

analyze_files.log = kwargs["logger"]

s = analyze_files.Analyzer(
top = "/fff/output/lookarea/",
app_tag = kwargs["name"],
report_directory = opts["path"],
top="/fff/output/lookarea/",
app_tag=kwargs["name"],
report_directory=opts["path"],
)

s.run_greenlet()
Loading

0 comments on commit e08d37c

Please sign in to comment.