diff --git a/src/uproot/source/chunk.py b/src/uproot/source/chunk.py index 2d9b0d08c..398ccec22 100644 --- a/src/uproot/source/chunk.py +++ b/src/uproot/source/chunk.py @@ -10,8 +10,8 @@ :doc:`uproot.source.chunk.Source`, the primary types of the "physical layer." """ - import numbers +import queue import numpy @@ -58,7 +58,7 @@ def chunk(self, start, stop): :doc:`uproot.source.chunk.Chunk`. """ - def chunks(self, ranges, notifications): + def chunks(self, ranges, notifications: queue.Queue): """ Args: ranges (list of (int, int) 2-tuples): Intervals to fetch @@ -162,7 +162,7 @@ def chunk(self, start, stop): self._executor.submit(future) return chunk - def chunks(self, ranges, notifications): + def chunks(self, ranges, notifications: queue.Queue): self._num_requests += 1 self._num_requested_chunks += len(ranges) self._num_requested_bytes += sum(stop - start for start, stop in ranges) @@ -207,13 +207,6 @@ def __exit__(self, exception_type, exception_value, traceback): self._executor.__exit__(exception_type, exception_value, traceback) -def notifier(chunk, notifications): - def notify(): - notifications.put(chunk) - - return notify - - class Chunk: """ Args: @@ -457,3 +450,19 @@ def remainder(self, start, cursor, context): context, self._source.file_path, ) + + +def notifier(chunk: Chunk, notifications: queue.Queue): + """ + Returns a function that puts the chunk on the notifications queue when called. + The function has a 'future' argument to be compatible with the `concurrent.futures.Future.add_done_callback` method. + + Args: + chunk (:doc:`uproot.source.chunk.Chunk`): The chunk to put on the queue. + notifications (``queue.Queue``): The notifications queue. + """ + + def notify(future=None): + notifications.put(chunk) + + return notify diff --git a/src/uproot/source/futures.py b/src/uproot/source/futures.py index d74ad2f62..584be65b2 100644 --- a/src/uproot/source/futures.py +++ b/src/uproot/source/futures.py @@ -21,7 +21,6 @@ These classes implement a *subset* of Python's Future and Executor interfaces. """ - import os import queue import sys @@ -172,8 +171,9 @@ def run(self): class ThreadPoolExecutor: """ Args: - num_workers (None or int): The number of workers to start. If None, - use ``os.cpu_count()``. + max_workers (None or int): The maximum number of workers to start. + In the current implementation this is exactly the number of workers. + If None, use ``os.cpu_count()``. Like Python 3 ``concurrent.futures.ThreadPoolExecutor`` except that it has only the subset of the interface Uproot needs and is available in Python 2. @@ -182,18 +182,18 @@ class ThreadPoolExecutor: class. """ - def __init__(self, num_workers=None): - if num_workers is None: + def __init__(self, max_workers=None): + if max_workers is None: if hasattr(os, "cpu_count"): - num_workers = os.cpu_count() + self._max_workers = os.cpu_count() else: import multiprocessing - num_workers = multiprocessing.cpu_count() + self._max_workers = multiprocessing.cpu_count() self._work_queue = queue.Queue() self._workers = [] - for _ in range(num_workers): + for _ in range(self._max_workers): self._workers.append(Worker(self._work_queue)) for worker in self._workers: worker.start() @@ -204,7 +204,14 @@ def __repr__(self): ) @property - def num_workers(self): + def max_workers(self) -> int: + """ + The maximum number of workers. + """ + return self._max_workers + + @property + def num_workers(self) -> int: """ The number of workers. """ @@ -263,6 +270,9 @@ def __init__(self, task): self._notify = None def _set_notify(self, notify): + """ + Set the ``notify`` function that is called when this task is complete. + """ self._notify = notify def _set_excinfo(self, excinfo):