Skip to content

Commit

Permalink
Merge pull request #5 from MoseleyBioinformaticsLab/tests
Browse files Browse the repository at this point in the history
Introduces tests and the corresponding GitHub action
  • Loading branch information
erikhuck authored Mar 20, 2024
2 parents 9a8319f + da13976 commit bc7254d
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 40 deletions.
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
omit = src/gpu_tracker/__init__.py,src/gpu_tracker/_version.py

29 changes: 29 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: build

on:
push:
branches:
- main
pull_request:
branches:
- main
workflow_dispatch:

jobs:
build:
strategy:
matrix:
python-version: ["3.10", "3.11"]
os: [ ubuntu-latest, windows-latest, macOS-latest ]
runs-on: ${{matrix.os}}

steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: Install testing environment and kegg_pull package
run: bash tests/install.sh
- name: Test with pytest
run: bash tests/run.sh
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.coverage
.env/
.idea/
**/__pycache__/**
Expand Down
89 changes: 49 additions & 40 deletions src/gpu_tracker/tracker.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,74 @@
from __future__ import annotations
import time
import multiprocessing as mproc
import threading as thrd
import os
import psutil
import subprocess as subp

import logging as log
import sys

class Tracker:
def __init__(
self, sleep_time: float = 1.0, include_children: bool = True, ram_unit: str = 'gigabyte', gpu_unit: str = 'gigabyte',
time_unit: str = 'hour', n_join_attempts: int = 5, join_timeout: float = 10.0, kill_if_join_fails: bool = False):
self, sleep_time: float = 1.0, include_children: bool = True, ram_unit: str = 'gigabytes', gpu_unit: str = 'gigabytes',
time_unit: str = 'hours', n_join_attempts: int = 5, join_timeout: float = 10.0, kill_if_join_fails: bool = False):
Tracker._validate_mem_unit(ram_unit)
Tracker._validate_mem_unit(gpu_unit)
valid_time_units = {'second', 'minute', 'hour', 'day'}
Tracker._validate_unit(time_unit, valid_time_units, unit_type='time')
Tracker._validate_unit(time_unit, valid_units={'seconds', 'minutes', 'hours', 'days'}, unit_type='time')
self.sleep_time = sleep_time
self.include_children = include_children
self.ram_unit = ram_unit
self.gpu_unit = gpu_unit
self.time_unit = time_unit
self._ram_coefficient = {
'byte': 1.0,
'kilobyte': 1 / 1e3,
'megabyte': 1 / 1e6,
'gigabyte': 1 / 1e9,
'terabyte': 1 / 1e12
self._ram_coefficient: float = {
'bytes': 1.0,
'kilobytes': 1 / 1e3,
'megabytes': 1 / 1e6,
'gigabytes': 1 / 1e9,
'terabytes': 1 / 1e12
}[ram_unit]
self._gpu_coefficient = {
'byte': 1e6,
'kilobyte': 1e3,
'megabyte': 1.0,
'gigabyte': 1 / 1e3,
'terabyte': 1 / 1e6
self._gpu_coefficient: float = {
'bytes': 1e6,
'kilobytes': 1e3,
'megabytes': 1.0,
'gigabytes': 1 / 1e3,
'terabytes': 1 / 1e6
}[gpu_unit]
self._time_coefficient = {
'second': 1.0,
'minute': 1 / 60,
'hour': 1 / (60 * 60),
'day': 1 / (60 * 60 * 24)
self._time_coefficient: float = {
'seconds': 1.0,
'minutes': 1 / 60,
'hours': 1 / (60 * 60),
'days': 1 / (60 * 60 * 24)
}[time_unit]
self.stop_event = thrd.Event()
self.thread = thrd.Thread(target=self._profile)
self.max_ram = None
self.max_gpu = None
self.compute_time = None
self._time1 = None
self.n_join_attempts = n_join_attempts
self.join_timeout = join_timeout
self.kill_if_join_fails = kill_if_join_fails

@staticmethod
def _validate_mem_unit(unit: str):
valid_units = {'byte', 'kilobyte', 'megabyte', 'gigabyte', 'terabyte'}
Tracker._validate_unit(unit, valid_units, unit_type='memory')
Tracker._validate_unit(unit, valid_units={'bytes', 'kilobytes', 'megabytes', 'gigabytes', 'terabytes'}, unit_type='memory')

@staticmethod
def _validate_unit(unit: str, valid_units: set[str], unit_type: str):
if unit not in valid_units:
raise ValueError(f'"{unit}" is not a valid {unit_type} unit. Valid values are {", ".join(valid_units)}')
raise ValueError(f'"{unit}" is not a valid {unit_type} unit. Valid values are {", ".join(sorted(valid_units))}')

def _profile(self):
max_ram = 0
max_gpu = 0
start_time = time.time()
while not self.stop_event.is_set():
parent_process_id = os.getppid()
parent_process = psutil.Process(os.getppid())
process_id = os.getpid()
process = psutil.Process(process_id)
# Get the current RAM usage.
curr_mem_usage = parent_process.memory_info().rss
process_ids = {parent_process_id}
curr_mem_usage = process.memory_info().rss
process_ids = {process_id}
if self.include_children:
child_processes = parent_process.children()
child_processes = process.children()
process_ids.update(process.pid for process in child_processes)
for child_process in child_processes:
child_proc_usage = child_process.memory_info().rss
Expand All @@ -91,12 +90,11 @@ def _profile(self):
max_ram = curr_mem_usage
if curr_gpu_usage > max_gpu:
max_gpu = curr_gpu_usage
time.sleep(self.sleep_time)
_testable_sleep(self.sleep_time)
self.max_ram, self.max_gpu, self.compute_time = (
max_ram * self._ram_coefficient, max_gpu * self._gpu_coefficient, (time.time() - self._time1) * self._time_coefficient)
max_ram * self._ram_coefficient, max_gpu * self._gpu_coefficient, (time.time() - start_time) * self._time_coefficient)

def __enter__(self) -> Tracker:
self._time1 = time.time()
self.thread.start()
return self

Expand All @@ -116,15 +114,26 @@ def __exit__(self, *_):
f'The thread will likely not end until the parent process ends.')
if self.kill_if_join_fails:
log.warning('The thread failed to join and kill_if_join_fails is set. Exiting ...')
import sys
sys.exit(1)
self.max_ram = None
self.max_gpu = None
self.compute_time = None
self._time1 = None

def start(self):
self.__enter__()

def stop(self):
self.__exit__()

def __str__(self):
return f'Max RAM: {self.max_ram:.3f} {self.ram_unit}\n' \
f'Max GPU: {self.max_gpu:.3f} {self.gpu_unit}\n' \
f'Compute time: {self.compute_time:.3f} {self.time_unit}'

def __repr__(self):
return str(self) # pragma: no cover


def _testable_sleep(sleep_time: float) -> float:
""" The time.sleep() function causes issues when mocked in tests, so we create this wrapper that can be safely mocked.
:return: The result of time.sleep()
"""
return time.sleep(sleep_time) # pragma: no cover
8 changes: 8 additions & 0 deletions tests/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
To install the package in the local dev environment
.. parsed-literal::
python3 -m venv .env
source .env/bin/activate
python3 -m pip install -e .
To install the PyTest dependencies:
.. parsed-literal::
python3 -m pip install pytest pytest-mock pytest-cov
8 changes: 8 additions & 0 deletions tests/install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
echo "Removing previous .env/ directory if it exists..."
rm -rf .env/
echo "Creating new .env/ directory..."
python3 -m venv .env/
source .env/bin/activate || source .env/Scripts/activate # Windows has Scripts instead of bin
python3 -m pip install --upgrade pip
python3 -m pip install pytest pytest-mock pytest-cov sphinx sphinx-rtd-theme notebook
python3 -m pip install -e .
2 changes: 2 additions & 0 deletions tests/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
source .env/bin/activate || source .env/Scripts/activate # Windows has Scripts instead of bin
python3 -m pytest tests --cov --cov-branch --cov-report=term-missing
153 changes: 153 additions & 0 deletions tests/test_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import typing as typ
import gpu_tracker.tracker as track
import pytest as pt


@pt.fixture(name='use_context_manager', params=[True, False])
def get_use_context_manager(request) -> bool:
yield request.param


test_tracker_data = [
(
True, 1697450.0, 5800.0, 300.0, 'bytes', 'megabytes', 'seconds',
'Max RAM: 1697450.000 bytes\nMax GPU: 5800.000 megabytes\nCompute time: 300.000 seconds'
),
(
True, 1697.450, 5.8, 5.0, 'kilobytes', 'gigabytes', 'minutes',
'Max RAM: 1697.450 kilobytes\nMax GPU: 5.800 gigabytes\nCompute time: 5.000 minutes'
),
(
False, 0.5505, 1600000.0, 300.0 / 3600, 'megabytes', 'kilobytes', 'hours',
'Max RAM: 0.550 megabytes\nMax GPU: 1600000.000 kilobytes\nCompute time: 0.083 hours'
),
(
False, 550.5, 1600000000.0, 300.0 / (3600 * 24), 'kilobytes', 'bytes', 'days',
'Max RAM: 550.500 kilobytes\nMax GPU: 1600000000.000 bytes\nCompute time: 0.003 days'
)
]


@pt.mark.parametrize('include_children,expected_ram,expected_gpu,expected_time,ram_unit,gpu_unit,time_unit,tracker_str', test_tracker_data)
def test_tracker(
mocker, use_context_manager: bool, include_children: bool, expected_ram: float, expected_gpu: float, expected_time: float,
ram_unit: str, gpu_unit: str, time_unit: str, tracker_str):
class EventMock:
def __init__(self):
self.count = 0
self.is_set = mocker.MagicMock(wraps=self.is_set)
self.set = mocker.MagicMock()

def is_set(self) -> bool:
self.count += 1
return self.count > 3

class ThreadMock:
def __init__(self, target: typ.Callable):
self.target = target
self.start = mocker.MagicMock(wraps=self.start)
self.join = mocker.MagicMock()
self.is_alive = mocker.MagicMock(return_value=False)

def start(self):
self.target()

EventMock = mocker.patch('gpu_tracker.tracker.thrd.Event', wraps=EventMock)
ThreadMock = mocker.patch('gpu_tracker.tracker.thrd.Thread', wraps=ThreadMock)
process_id = 12
child1_id = 21
child2_id = 22
process_rams = [440400, 440400, 550500]
child1_rams = [590900, 590990, 490900]
child2_rams = [666000, 666060, 333000]
getpid_mock = mocker.patch('gpu_tracker.tracker.os.getpid', return_value=process_id)

def get_process_mock(pid: int, rams: list[int], children: list[mocker.MagicMock] | None = None) -> mocker.MagicMock:
return mocker.MagicMock(
pid=pid,
memory_info=mocker.MagicMock(side_effect=[mocker.MagicMock(rss=ram) for ram in rams]),
children=mocker.MagicMock(return_value=children) if children is not None else None)

child1_mock = get_process_mock(pid=child1_id, rams=child1_rams)
child2_mock = get_process_mock(pid=child2_id, rams=child2_rams)
process_mock = get_process_mock(pid=process_id, rams=process_rams, children=[child1_mock, child2_mock])
ProcessMock = mocker.patch('gpu_tracker.tracker.psutil.Process', return_value=process_mock)
nvidia_smi_outputs = [
b'',
b'12,1600 MiB\n21,700 MiB\n22,200 MiB',
b'12,1500 MiB\n21,2100 MiB\n22,2200 MiB']
check_output_mock = mocker.patch('gpu_tracker.tracker.subp.check_output', side_effect=nvidia_smi_outputs)
time_mock = mocker.patch('gpu_tracker.tracker.time.time', side_effect=[800, 900, 1000, 1100])
sleep_mock = mocker.patch('gpu_tracker.tracker._testable_sleep')
sleep_time = 1.5
join_timeout = 5.5
if use_context_manager:
with track.Tracker(
include_children=include_children, sleep_time=sleep_time, join_timeout=join_timeout, ram_unit=ram_unit, gpu_unit=gpu_unit,
time_unit=time_unit) as tracker:
pass
else:
tracker = track.Tracker(
include_children=include_children, sleep_time=sleep_time, join_timeout=join_timeout, ram_unit=ram_unit, gpu_unit=gpu_unit,
time_unit=time_unit)
tracker.start()
tracker.stop()
EventMock.assert_called_once_with()
ThreadMock.assert_called_once_with(target=tracker._profile)
tracker.thread.start.assert_called_once_with()
_assert_args_list(mock=tracker.stop_event.is_set, expected_args_list=[()] * 4)
_assert_args_list(mock=getpid_mock, expected_args_list=[()] * 3)
_assert_args_list(mock=ProcessMock, expected_args_list=[(process_id,)] * 3)
_assert_args_list(mock=process_mock.memory_info, expected_args_list=[()] * 3)
_assert_args_list(mock=process_mock.children, expected_args_list=[()] * 3 if include_children else [])
_assert_args_list(mock=child1_mock.memory_info, expected_args_list=[()] * 3 if include_children else [])
_assert_args_list(mock=child2_mock.memory_info, expected_args_list=[()] * 3 if include_children else [])
assert len(check_output_mock.call_args_list) == 3
_assert_args_list(mock=time_mock, expected_args_list=[()] * 4)
_assert_args_list(mock=sleep_mock, expected_args_list=[(sleep_time,)] * 3)
assert tracker.max_ram == expected_ram
assert tracker.max_gpu == expected_gpu
assert tracker.compute_time == expected_time
assert str(tracker) == tracker_str
tracker.stop_event.set.assert_called_once_with()
tracker.thread.join.assert_called_once_with(timeout=join_timeout)
_assert_args_list(mock=tracker.thread.is_alive, expected_args_list=[()] * 2)


def _assert_args_list(mock, expected_args_list: list[tuple | dict], use_kwargs: bool = False):
actual_args_list = [call.kwargs if use_kwargs else call.args for call in mock.call_args_list]
assert actual_args_list == expected_args_list


@pt.mark.parametrize('kill_if_join_fails', [True, False])
def test_warnings(mocker, kill_if_join_fails: bool, caplog):
n_join_attempts = 3
join_timeout = 5.2
mocker.patch('gpu_tracker.tracker.thrd.Event', return_value=mocker.MagicMock(set=mocker.MagicMock()))
mocker.patch(
'gpu_tracker.tracker.thrd.Thread',
return_value=mocker.MagicMock(start=mocker.MagicMock(), is_alive=mocker.MagicMock(return_value=True), join=mocker.MagicMock())
)
exit_mock = mocker.patch('gpu_tracker.tracker.sys.exit')
with track.Tracker(kill_if_join_fails=kill_if_join_fails, n_join_attempts=n_join_attempts, join_timeout=join_timeout) as tracker:
pass
_assert_args_list(mock=tracker.stop_event.set, expected_args_list=[()] * n_join_attempts)
_assert_args_list(mock=tracker.thread.join, expected_args_list=[{'timeout': join_timeout}] * n_join_attempts, use_kwargs=True)
_assert_args_list(mock=tracker.thread.is_alive, expected_args_list=[()] * (n_join_attempts + 1))
expected_warnings = ['Thread is still alive after join timout. Attempting to join again...'] * n_join_attempts
expected_warnings.append(
'Thread is still alive after 3 attempts to join. The thread will likely not end until the parent process ends.')
if kill_if_join_fails:
expected_warnings.append('The thread failed to join and kill_if_join_fails is set. Exiting ...')
exit_mock.assert_called_once_with(1)
else:
assert not exit_mock.called
for expected_warning, record in zip(expected_warnings, caplog.records):
assert record.levelname == 'WARNING'
assert record.message == expected_warning


def test_validate_unit():
with pt.raises(ValueError) as error:
track.Tracker(ram_unit='milibytes')
assert str(error.value) == '"milibytes" is not a valid memory unit. Valid values are bytes, gigabytes, kilobytes, megabytes, terabytes'

0 comments on commit bc7254d

Please sign in to comment.