Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve uproot.futures compatibility with concurrent.futures #983

Merged
merged 7 commits into from
Oct 12, 2023
28 changes: 17 additions & 11 deletions src/uproot/source/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
These classes implement a *subset* of Python's Future and Executor interfaces.
"""


import os
import queue
import sys
Expand Down Expand Up @@ -172,8 +171,9 @@ def run(self):
class ThreadPoolExecutor:
"""
Args:
max_workers (None or int): The number of workers to start. If None,
use ``os.cpu_count()``.
max_workers (None or int): The maximum number of workers to start.
In the current implementation this is exactly the number of workers.
If None, use ``os.cpu_count()``.

Like Python 3 ``concurrent.futures.ThreadPoolExecutor`` except that it has
only the subset of the interface Uproot needs and is available in Python 2.
Expand All @@ -185,15 +185,15 @@ class ThreadPoolExecutor:
def __init__(self, max_workers=None):
if max_workers is None:
if hasattr(os, "cpu_count"):
max_workers = os.cpu_count()
self._max_workers = os.cpu_count()
else:
import multiprocessing

max_workers = multiprocessing.cpu_count()
self._max_workers = multiprocessing.cpu_count()

self._work_queue = queue.Queue()
self._workers = []
for _ in range(max_workers):
for _ in range(self._max_workers):
self._workers.append(Worker(self._work_queue))
for worker in self._workers:
worker.start()
Expand All @@ -204,15 +204,18 @@ def __repr__(self):
)

@property
def max_workers(self):
def max_workers(self) -> int:
"""
The number of workers.
The maximum number of workers.
"""
return len(self._workers)
return len(self._max_workers)
lobis marked this conversation as resolved.
Show resolved Hide resolved

@property
def num_workers(self):
return self.max_workers
def num_workers(self) -> int:
"""
The number of workers.
"""
return len(self._workers)

@property
def workers(self):
Expand Down Expand Up @@ -267,6 +270,9 @@ def __init__(self, task):
self._notify = None

def _set_notify(self, notify):
"""
Set the ``notify`` function that is called when this task is complete.
"""
self._notify = notify

def _set_excinfo(self, excinfo):
Expand Down