Skip to content

Commit

Permalink
Removed Windows specific code. Added tests. Updated docs
Browse files Browse the repository at this point in the history
  • Loading branch information
sybrenjansen committed Jan 2, 2024
1 parent 51d76da commit 8f1d5f0
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 78 deletions.
3 changes: 3 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ Unreleased

* Add the option to only show progress on the dashboard. (`#107`_)
* Import escape directly from markupsafe, instead of from flask. (`#106`_)
* Insights now also work when using the ``forkserver`` and ``spawn`` start methods. (`#104`_)
* When using insights on Windows the arguments of the top 5 longest tasks are now available as well.

.. _#108: https://github.com/sybrenjansen/mpire/pull/107
.. _#107: https://github.com/sybrenjansen/mpire/issues/106
.. _#104: https://github.com/sybrenjansen/mpire/issues/104


2.8.1
Expand Down
1 change: 0 additions & 1 deletion docs/troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ Windows

Windows support has some caveats:

* When using worker insights the arguments of the top 5 longest tasks are not available;
* Progress bar is not supported when using threading as start method;
* When using ``dill`` and an exception occurs, or when the exception occurs in an exit function, it can print additional
``OSError`` messages in the terminal, but they can be safely ignored.
Expand Down
4 changes: 0 additions & 4 deletions docs/usage/workerpool/worker_insights.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,3 @@ information.

When using `imap` or `imap_unordered` you can view the insights during execution. Simply call ``get_insights()``
or ``print_insights()`` inside your loop where you process the results.

.. note::

When using Windows the arguments of the top 5 longest tasks are not available.
21 changes: 6 additions & 15 deletions mpire/insights.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from functools import partial
from typing import Dict, Optional, List, Tuple

from mpire.context import RUNNING_WINDOWS
from mpire.utils import PicklableSyncManager, format_seconds


Expand Down Expand Up @@ -65,11 +64,8 @@ def reset_insights(self, enable_insights: bool) -> None:
:param enable_insights: Whether to enable worker insights
"""
if enable_insights:
# When we're on Windows, we don't use a Manager as it's giving authentication errors. The max_task_args
# information is therefore not available on Windows systems. This needs to be fixed at some point in time
if not RUNNING_WINDOWS:
self.insights_manager = PicklableSyncManager(authkey=os.urandom(24))
self.insights_manager.start()
self.insights_manager = PicklableSyncManager(authkey=os.urandom(24))
self.insights_manager.start()
self.insights_manager_lock = self.ctx.Lock()
self.worker_start_up_time = self.ctx.Array(ctypes.c_double, self.n_jobs, lock=False)
self.worker_init_time = self.ctx.Array(ctypes.c_double, self.n_jobs, lock=False)
Expand All @@ -78,11 +74,7 @@ def reset_insights(self, enable_insights: bool) -> None:
self.worker_working_time = self.ctx.Array(ctypes.c_double, self.n_jobs, lock=False)
self.worker_exit_time = self.ctx.Array(ctypes.c_double, self.n_jobs, lock=False)
self.max_task_duration = self.ctx.Array(ctypes.c_double, self.n_jobs * 5, lock=False)
if RUNNING_WINDOWS:
# Doesn't actually do anything, but reduces the amount of if/else statements in other code parts
self.max_task_args = [""] * self.n_jobs * 5
else:
self.max_task_args = self.insights_manager.list([""] * self.n_jobs * 5)
self.max_task_args = self.insights_manager.list([""] * self.n_jobs * 5)
else:
self.insights_manager = None
self.insights_manager_lock = None
Expand Down Expand Up @@ -187,10 +179,10 @@ def mean_std(seq):
for idx in sorted_idx:
if self.max_task_duration[idx] == 0:
break
if self.max_task_args[idx] == "" and not RUNNING_WINDOWS:
if self.max_task_args[idx] == "":
continue
top_5_max_task_durations.append(format_seconds_func(self.max_task_duration[idx]))
top_5_max_task_args.append("" if RUNNING_WINDOWS else self.max_task_args[idx])
top_5_max_task_args.append(self.max_task_args[idx])

# Populate
total_start_up_time = sum(self.worker_start_up_time)
Expand Down Expand Up @@ -268,9 +260,8 @@ def get_insights_string(self) -> str:
insights_str.extend(["",
"Top 5 longest tasks",
"-------------------"])
max_task_duration_args_separator = "" if RUNNING_WINDOWS else " - "
for task_idx, (duration, args) in enumerate(zip(insights['top_5_max_task_durations'],
insights['top_5_max_task_args']), start=1):
insights_str.append(f"{task_idx}. Time: {duration}{max_task_duration_args_separator}{args}")
insights_str.append(f"{task_idx}. Time: {duration} - {args}")

return "\n".join(insights_str)
124 changes: 66 additions & 58 deletions tests/test_insights.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@
from time import sleep
from unittest.mock import patch

from tqdm import tqdm

from mpire import WorkerPool
from mpire.context import DEFAULT_START_METHOD
from mpire.insights import RUNNING_WINDOWS, WorkerInsights
from mpire.context import DEFAULT_START_METHOD, FORK_AVAILABLE
from mpire.insights import WorkerInsights
from mpire.utils import PicklableSyncManager
from tests.utils import MockDatetimeNow


# Skip start methods that use fork if it's not available
if not FORK_AVAILABLE:
TEST_START_METHODS = ['spawn', 'threading']
else:
TEST_START_METHODS = ['fork', 'forkserver', 'spawn', 'threading']


def square(barrier, x):
# Wait until all workers are ready
barrier.wait()
Expand Down Expand Up @@ -55,18 +64,15 @@ def test_reset_insights(self):
insights.reset_insights(enable_insights=True)
self.assertTrue(insights.insights_enabled)
self.assertIsInstance(insights.insights_manager_lock, mp.synchronize.Lock)
if RUNNING_WINDOWS:
self.assertIsNone(insights.insights_manager)
else:
self.assertIsInstance(insights.insights_manager, PicklableSyncManager)
self.assertIsInstance(insights.insights_manager, PicklableSyncManager)
self.assertIsInstance(insights.worker_start_up_time, ctypes.Array)
self.assertIsInstance(insights.worker_init_time, ctypes.Array)
self.assertIsInstance(insights.worker_n_completed_tasks, ctypes.Array)
self.assertIsInstance(insights.worker_waiting_time, ctypes.Array)
self.assertIsInstance(insights.worker_working_time, ctypes.Array)
self.assertIsInstance(insights.worker_exit_time, ctypes.Array)
self.assertIsInstance(insights.max_task_duration, ctypes.Array)
self.assertIsInstance(insights.max_task_args, list if RUNNING_WINDOWS else managers.ListProxy)
self.assertIsInstance(insights.max_task_args, managers.ListProxy)

# Basic sanity checks for the values
self.assertEqual(sum(insights.worker_start_up_time), 0)
Expand All @@ -76,8 +82,7 @@ def test_reset_insights(self):
self.assertEqual(sum(insights.worker_working_time), 0)
self.assertEqual(sum(insights.worker_exit_time), 0)
self.assertEqual(sum(insights.max_task_duration), 0)
if not RUNNING_WINDOWS:
self.assertListEqual(list(insights.max_task_args), [''] * n_jobs * 5)
self.assertListEqual(list(insights.max_task_args), [''] * n_jobs * 5)

# Set some values so we can test if the containers will be properly resetted
insights.worker_start_up_time[0] = 1
Expand Down Expand Up @@ -126,56 +131,59 @@ def test_enable_insights(self):
"""
with warnings.catch_warnings():
warnings.simplefilter("ignore")

with WorkerPool(n_jobs=2, enable_insights=True) as pool:

# We run this a few times to see if it resets properly. We only verify this by checking the
# n_completed_tasks
for idx in range(3):
with self.subTest('enabled', idx=idx):

# We add a barrier so we know that all workers are ready. After that, the workers start working.
# Additionally, we set chunk size to 1, max tasks active to 2, and have a time.sleep in the
# get_tasks function, so we know for sure that there will be waiting time

print()
for start_method in tqdm(TEST_START_METHODS):

with WorkerPool(n_jobs=2, start_method=start_method, enable_insights=True) as pool:

# We run this a few times to see if it resets properly. We only verify this by checking the
# n_completed_tasks
for idx in range(3):
with self.subTest('enabled', idx=idx, start_method=start_method):

# We add a barrier so we know that all workers are ready. After that, the workers start
# working. Additionally, we set chunk size to 1, max tasks active to 2, and have a
# time.sleep in the get_tasks function, so we know for sure that there will be waiting time
barrier = pool.ctx.Barrier(2)
pool.set_shared_objects(barrier)
pool.map(square, self._get_tasks(10), worker_init=self._init, worker_exit=self._exit,
max_tasks_active=2, chunk_size=1)

# Basic sanity checks for the values. Some max task args can be empty, in that case the
# duration should be 0 (= no data)
self.assertGreater(sum(pool._worker_insights.worker_start_up_time), 0)
self.assertGreater(sum(pool._worker_insights.worker_init_time), 0)
self.assertEqual(sum(pool._worker_insights.worker_n_completed_tasks), 10)
self.assertGreater(sum(pool._worker_insights.worker_waiting_time), 0)
self.assertGreater(sum(pool._worker_insights.worker_working_time), 0)
self.assertGreater(sum(pool._worker_insights.worker_exit_time), 0)
self.assertGreater(max(pool._worker_insights.max_task_duration), 0)
for duration, args in zip(pool._worker_insights.max_task_duration,
pool._worker_insights.max_task_args):
if duration == 0:
self.assertEqual(args, '')
else:
self.assertIn(args, {'Arg 0: 0', 'Arg 0: 1', 'Arg 0: 2', 'Arg 0: 3', 'Arg 0: 4',
'Arg 0: 5', 'Arg 0: 6', 'Arg 0: 7', 'Arg 0: 8', 'Arg 0: 9'})

with WorkerPool(n_jobs=2, enable_insights=False) as pool:

# Disabling should set things to None again
with self.subTest('disable', start_method=start_method):
barrier = pool.ctx.Barrier(2)
pool.set_shared_objects(barrier)
pool.map(square, self._get_tasks(10), worker_init=self._init, worker_exit=self._exit,
max_tasks_active=2, chunk_size=1)

# Basic sanity checks for the values. Some max task args can be empty, in that case the duration
# should be 0 (= no data)
self.assertGreater(sum(pool._worker_insights.worker_start_up_time), 0)
self.assertGreater(sum(pool._worker_insights.worker_init_time), 0)
self.assertEqual(sum(pool._worker_insights.worker_n_completed_tasks), 10)
self.assertGreater(sum(pool._worker_insights.worker_waiting_time), 0)
self.assertGreater(sum(pool._worker_insights.worker_working_time), 0)
self.assertGreater(sum(pool._worker_insights.worker_exit_time), 0)
self.assertGreater(max(pool._worker_insights.max_task_duration), 0)
for duration, args in zip(pool._worker_insights.max_task_duration,
pool._worker_insights.max_task_args):
if duration == 0:
self.assertEqual(args, '')
elif not RUNNING_WINDOWS:
self.assertIn(args, {'Arg 0: 0', 'Arg 0: 1', 'Arg 0: 2', 'Arg 0: 3', 'Arg 0: 4',
'Arg 0: 5', 'Arg 0: 6', 'Arg 0: 7', 'Arg 0: 8', 'Arg 0: 9'})

with WorkerPool(n_jobs=2, enable_insights=False) as pool:

# Disabling should set things to None again
with self.subTest('disable'):
barrier = pool.ctx.Barrier(2)
pool.set_shared_objects(barrier)
pool.map(square, range(10))
self.assertIsNone(pool._worker_insights.insights_manager)
self.assertIsNone(pool._worker_insights.insights_manager_lock)
self.assertIsNone(pool._worker_insights.worker_start_up_time)
self.assertIsNone(pool._worker_insights.worker_init_time)
self.assertIsNone(pool._worker_insights.worker_n_completed_tasks)
self.assertIsNone(pool._worker_insights.worker_waiting_time)
self.assertIsNone(pool._worker_insights.worker_working_time)
self.assertIsNone(pool._worker_insights.worker_exit_time)
self.assertIsNone(pool._worker_insights.max_task_duration)
self.assertIsNone(pool._worker_insights.max_task_args)
pool.map(square, range(10))
self.assertIsNone(pool._worker_insights.insights_manager)
self.assertIsNone(pool._worker_insights.insights_manager_lock)
self.assertIsNone(pool._worker_insights.worker_start_up_time)
self.assertIsNone(pool._worker_insights.worker_init_time)
self.assertIsNone(pool._worker_insights.worker_n_completed_tasks)
self.assertIsNone(pool._worker_insights.worker_waiting_time)
self.assertIsNone(pool._worker_insights.worker_working_time)
self.assertIsNone(pool._worker_insights.worker_exit_time)
self.assertIsNone(pool._worker_insights.max_task_duration)
self.assertIsNone(pool._worker_insights.max_task_args)

def test_get_max_task_duration_list(self):
"""
Expand Down Expand Up @@ -382,7 +390,7 @@ def test_get_insights(self):
'total_working_time': '0:01:19',
'total_exit_time': '0:00:00.770',
'top_5_max_task_durations': ['0:00:06', '0:00:02', '0:00:01', '0:00:00.800', '0:00:00.100'],
'top_5_max_task_args': ['', '', '', '', ''] if RUNNING_WINDOWS else ['3', '2', '1', '4', '5'],
'top_5_max_task_args': ['3', '2', '1', '4', '5'],
'total_time': '0:01:21.100',
'start_up_time_mean': '0:00:00.150', 'start_up_time_std': '0:00:00.050',
'init_time_mean': '0:00:00.165', 'init_time_std': '0:00:00.055',
Expand Down

0 comments on commit 8f1d5f0

Please sign in to comment.