diff --git a/docker/calc_mem.py b/docker/calc_mem.py index 173a7d6d..4fa72a09 100755 --- a/docker/calc_mem.py +++ b/docker/calc_mem.py @@ -4,14 +4,23 @@ Print result to stdout. """ +import math import argparse import sys import os +import re +import logging import os.path import multiprocessing +import psutil + +#from util.misc import available_cpu_count # use the version of available_cpu_count() from viral-core/util/misc.py + +log = logging.getLogger(__name__) + parser = argparse.ArgumentParser('Calculated memory allocated to the process') -parser.add_argument('mem_unit', choices=('mb', 'gb'), help='memory units') +parser.add_argument('mem_unit', choices=('b', 'kb', 'mb', 'gb'), help='memory units') parser.add_argument('mem_fraction', type=int, help='what fraction of total memory to report') parser.add_argument('--per-cpu', dest="per_cpu", action='store_true', help='Calculate memory per-CPU.') args = parser.parse_args() @@ -19,7 +28,8 @@ if not (1 <= args.mem_fraction <= 100): raise RuntimeError("mem_fraction should be in the range [1,100]") -unit2factor = {'k': 1024, 'm': 1024*1024, 'g': 1024*1024*1024} +unit2factor = {'b': 1, 'k': 1024, 'm': 1024*1024, 'g': 1024*1024*1024} +MAX_INT32 = (2 ** 31)-1 def available_cpu_count(): """ @@ -33,24 +43,42 @@ def available_cpu_count(): cgroup_cpus = MAX_INT32 try: - def slurp_file(fname): - with open(fname) as f: - return f.read() - def get_cpu_val(name): - return float(slurp_file('/sys/fs/cgroup/cpu/cpu.'+name).strip()) - cfs_quota = get_cpu_val('cfs_quota_us') - if cfs_quota > 0: - cfs_period = get_cpu_val('cfs_period_us') - log.debug('cfs_quota %s, cfs_period %s', cfs_quota, cfs_period) - cgroup_cpus = max(1, int(cfs_quota / cfs_period)) + def _load(path, encoding="utf-8"): + """ Loads a file content """ + with open(path, 'r', encoding=encoding, newline="") as handle: + tmp = handle.read() + return tmp + + # cgroup CPU count determination (w/ v2) adapted from: + # https://github.com/conan-io/conan/blob/2.9.2/conan/tools/build/cpu.py#L31-L54 + # + # see also: + # https://docs.kernel.org/scheduler/sched-bwc.html + + # This is necessary to determine docker cpu_count + cfs_quota_us = cfs_period_us = 0 + # cgroup v2 + if os.path.exists("/sys/fs/cgroup/cgroup.controllers"): + cpu_max = _load("/sys/fs/cgroup/cpu.max").split() + if cpu_max[0] != "max": + if len(cpu_max) == 1: + cfs_quota_us, cfs_period_us = int(cpu_max[0]), 100_000 + else: + cfs_quota_us, cfs_period_us = map(int, cpu_max) + # cgroup v1 + else: + cfs_quota_us = int(_load("/sys/fs/cgroup/cpu/cpu.cfs_quota_us")) + cfs_period_us = int(_load("/sys/fs/cgroup/cpu/cpu.cfs_period_us")) + + log.debug('cfs_quota_us %s, cfs_period_us %s', cfs_quota_us, cfs_period_us) + if cfs_quota_us > 0 and cfs_period_us > 0: + cgroup_cpus = max(1, int(math.ceil(cfs_quota_us / cfs_period_us))) except Exception as e: pass proc_cpus = MAX_INT32 try: - with open('/proc/self/status') as f: - status = f.read() - m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', status) + m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', _load('/proc/self/status')) if m: res = bin(int(m.group(1).replace(',', ''), 16)).count('1') if res > 0: @@ -75,15 +103,44 @@ def mem_from_proc_meminfo(): def mem_from_cgroups(): """Return the total memory, in bytes, as given by cgroups (or sys.maxsize if not given)""" - cgroups_memlimit_fname = '/sys/fs/cgroup/memory/memory.limit_in_bytes' - if os.path.isfile(cgroups_memlimit_fname): - with open(cgroups_memlimit_fname) as f: - val = f.read().strip() - return int(val) * unit2factor.get(val[-1], 1) + # list of potential cgroup paths to max mem info + # see: + # (cgroup v1) https://www.kernel.org/doc/Documentation/cgroup-v1/memory.txt + # (cgroup v2) https://www.kernel.org/doc/Documentation/cgroup-v2.txt + cgroups_memlimit_fnames = [ + '/sys/fs/cgroup/memory/memory.limit_in_bytes', # cgroup v1 + '/sys/fs/cgroup/memory.max' # cgroup v2 + ] + # try the various potential cgroup memory info paths + for cgroups_memlimit_fname in cgroups_memlimit_fnames: + if os.path.isfile(cgroups_memlimit_fname): + with open(cgroups_memlimit_fname) as f: + val = f.read().strip() + if val != "max": + return int(val) * unit2factor.get(val[-1], 1) return sys.maxsize -mem_in_bytes = min(mem_from_proc_meminfo(), mem_from_cgroups()) +def mem_from_psutil(metric_name="total"): + """ Use psutil to get a memory metric by name in a cross-platform way + Returning sys.maxsize (obviously wrong large value) + in the event the value cannot be obtained. + + For available metrics, see: + https://psutil.readthedocs.io/en/latest/#psutil.virtual_memory + """ + mem_info = psutil.virtual_memory() + + return int(getattr(mem_info,metric_name,sys.maxsize)) + +# of the memory values obtained, use the smallest value +# this results in obviously-wrong values obtained from sys.maxsize +# in mem_from_cgroups() or mem_from_psutil() falling in precedence +mem_in_bytes = min( + mem_from_psutil(), + mem_from_proc_meminfo(), + mem_from_cgroups() + ) if args.per_cpu: mem_in_bytes = mem_in_bytes/available_cpu_count() diff --git a/requirements-conda.txt b/requirements-conda.txt index 9580e6d7..cda16e71 100644 --- a/requirements-conda.txt +++ b/requirements-conda.txt @@ -18,6 +18,7 @@ novoalign=3.09.04 parallel>=20190922 picard=2.25.6 pigz>=2.4 +psutil>=6.1.0 prinseq>=0.20.4 samtools>=1.16.1 trimmomatic>=0.38 diff --git a/test/unit/test_util_misc.py b/test/unit/test_util_misc.py index aa9a3d98..b8613afa 100644 --- a/test/unit/test_util_misc.py +++ b/test/unit/test_util_misc.py @@ -5,6 +5,7 @@ import os, random, collections import unittest import subprocess +import multiprocessing import util.misc import util.file import pytest @@ -284,15 +285,76 @@ def test_chk(): def test_available_cpu_count(monkeypatch_function_result): reported_cpu_count = util.misc.available_cpu_count() - assert reported_cpu_count >= int(os.environ.get('PYTEST_XDIST_WORKER_COUNT', '1')) + assert util.misc.available_cpu_count() == reported_cpu_count - with monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_quota_us', patch_result='1'), \ - monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_period_us', patch_result='1'): + # cgroup v2 limited to 1 cpu + with monkeypatch_function_result(os.path.exists, "/sys/fs/cgroup/cgroup.controllers", patch_result=True, patch_module=os.path), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu.max', patch_result="100000 100000"): assert util.misc.available_cpu_count() == 1 - assert util.misc.available_cpu_count() == reported_cpu_count + # cgroup v2 limited to 2 cpu + with monkeypatch_function_result(os.path.exists, "/sys/fs/cgroup/cgroup.controllers", patch_result=True, patch_module=os.path), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu.max', patch_result="200000 100000"): + assert util.misc.available_cpu_count() == 2 + + # cgroup v2 with no CPU limit imposed on cgroup + # (fall back to /proc/self/status method, with limit imposed there): + # 'Cpus_allowed: d' = 0b1101 bitmask (meaning execution allowed on 3 CPUs) + with monkeypatch_function_result(os.path.exists, "/sys/fs/cgroup/cgroup.controllers", patch_result=True, patch_module=os.path), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu.max', patch_result="max 100000"), \ + monkeypatch_function_result(util.file.slurp_file, '/proc/self/status', patch_result='Cpus_allowed: d'): + assert util.misc.available_cpu_count() == 3 + + # cgroup v1 limited to 2 CPUs + with monkeypatch_function_result(os.path.exists, "/sys/fs/cgroup/cgroup.controllers", patch_result=False, patch_module=os.path), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_quota_us', patch_result='200000'), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_period_us', patch_result='100000'): + + assert util.misc.available_cpu_count() == 2 + + # cgroup v1 limited to 1 CPU + with monkeypatch_function_result(os.path.exists, "/sys/fs/cgroup/cgroup.controllers", patch_result=False, patch_module=os.path), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_quota_us', patch_result='1'), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_period_us', patch_result='1'): + + assert util.misc.available_cpu_count() == 1 - with monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_quota_us', patch_result='-1'), \ + # cgroup v1 with no limit imposed on the cgroup + # (fall back to /proc/self/status method, with limit imposed there): + # 'Cpus_allowed: c' = 0b1100 bitmask (meaning execution allowed on 2 CPUs) + with monkeypatch_function_result(os.path.exists, "/sys/fs/cgroup/cgroup.controllers", patch_result=False, patch_module=os.path), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_quota_us', patch_result='-1'), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_period_us', patch_result='1'), \ + monkeypatch_function_result(util.file.slurp_file, '/proc/self/status', patch_result='Cpus_allowed: c'): + + assert util.misc.available_cpu_count() == 2 + + # cgroup v1 with no limit imposed on the cgoup or via /proc/self/status + # (fall back to /proc/self/status method, with no limit imposed there) + with monkeypatch_function_result(os.path.exists, "/sys/fs/cgroup/cgroup.controllers", patch_result=False, patch_module=os.path), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_quota_us', patch_result='-1'), \ monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_period_us', patch_result='1'): + + assert util.misc.available_cpu_count() == reported_cpu_count + + # cgroup v1 with no limit imposed on the cgoup + # with 'Cpus_allowed' not present in /proc/self/status + # (fall back to multiprocessing.cpu_count() method) + with monkeypatch_function_result(os.path.exists, "/sys/fs/cgroup/cgroup.controllers", patch_result=False, patch_module=os.path), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_quota_us', patch_result='-1'), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_period_us', patch_result='1'), \ + monkeypatch_function_result(util.file.slurp_file, '/proc/self/status', patch_result='unexpected_key: 1'): + assert util.misc.available_cpu_count() == reported_cpu_count + + # cgroup v1 with no limit imposed on the cgoup + # with 'Cpus_allowed' not present in /proc/self/status + # (fall back to multiprocessing.cpu_count() method with CPU count of 2 reported) + with monkeypatch_function_result(os.path.exists, "/sys/fs/cgroup/cgroup.controllers", patch_result=False, patch_module=os.path), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_quota_us', patch_result='-1'), \ + monkeypatch_function_result(util.file.slurp_file, '/sys/fs/cgroup/cpu/cpu.cfs_period_us', patch_result='1'), \ + monkeypatch_function_result(util.file.slurp_file, '/proc/self/status', patch_result='unexpected_key: 1'), \ + monkeypatch_function_result(multiprocessing.cpu_count, patch_result=2, patch_module=multiprocessing): + + assert util.misc.available_cpu_count() == 2 \ No newline at end of file diff --git a/util/misc.py b/util/misc.py index 15dc9fe8..452ed989 100644 --- a/util/misc.py +++ b/util/misc.py @@ -1,4 +1,5 @@ '''A few miscellaneous tools. ''' +import math import collections import contextlib import itertools, functools, operator @@ -334,21 +335,38 @@ def available_cpu_count(): cgroup_cpus = MAX_INT32 try: - def get_cpu_val(name): - return float(util.file.slurp_file('/sys/fs/cgroup/cpu/cpu.'+name).strip()) - cfs_quota = get_cpu_val('cfs_quota_us') - if cfs_quota > 0: - cfs_period = get_cpu_val('cfs_period_us') - log.debug('cfs_quota %s, cfs_period %s', cfs_quota, cfs_period) - cgroup_cpus = max(1, int(cfs_quota / cfs_period)) + # cgroup CPU count determination (w/ v2) adapted from: + # https://github.com/conan-io/conan/blob/2.9.2/conan/tools/build/cpu.py#L31-L54 + # + # see also: + # https://docs.kernel.org/scheduler/sched-bwc.html + + # This is necessary to determine docker cpu_count + cfs_quota_us = cfs_period_us = 0 + # cgroup v2 + if os.path.exists("/sys/fs/cgroup/cgroup.controllers"): + log.debug("cgroup v2 detected") + cpu_max = util.file.slurp_file("/sys/fs/cgroup/cpu.max").split() + if cpu_max[0] != "max": + if len(cpu_max) == 1: + cfs_quota_us, cfs_period_us = int(cpu_max[0]), 100_000 + else: + cfs_quota_us, cfs_period_us = map(int, cpu_max) + # cgroup v1 + else: + log.debug("cgroup v1 detected") + cfs_quota_us = int(util.file.slurp_file("/sys/fs/cgroup/cpu/cpu.cfs_quota_us")) + cfs_period_us = int(util.file.slurp_file("/sys/fs/cgroup/cpu/cpu.cfs_period_us")) + + log.debug('cfs_quota_us %s, cfs_period_us %s', cfs_quota_us, cfs_period_us) + if cfs_quota_us > 0 and cfs_period_us > 0: + cgroup_cpus = max(1, int(math.ceil(cfs_quota_us / cfs_period_us))) except Exception as e: pass proc_cpus = MAX_INT32 try: - with open('/proc/self/status') as f: - status = f.read() - m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', status) + m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', util.file.slurp_file('/proc/self/status')) if m: res = bin(int(m.group(1).replace(',', ''), 16)).count('1') if res > 0: