-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtools.py
64 lines (59 loc) · 2.36 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
def parallel_process(array,
function,
n_jobs=16,
use_kwargs=False,
front_num=3,
tqdm=tqdm):
"""
This function was copied from here:
http://danshiebler.com/2016-09-14-parallel-progress-bar/
A parallel version of the map function with a progress bar.
Args:
array (array-like): An array to iterate over.
function (function):
A python function to apply to the elements of array
n_jobs (int, default=16): The number of cores to use
use_kwargs (boolean, default=False):
Whether to consider the elements of array as dictionaries of
keyword arguments to function
front_num (int, default=3): The number of iterations to run serially
before kicking off the parallel job.
Useful for catching bugs
Returns:
[function(array[0]), function(array[1]), ...]
"""
# We run the first few iterations serially to catch bugs
if front_num > 0:
front = [function(**a) if use_kwargs else function(a)
for a in array[:front_num]]
# If we set n_jobs to 1, just run a list comprehension. This is useful for
# benchmarking and debugging.
if n_jobs == 1:
return front + [function(**a) if use_kwargs else function(a)
for a in tqdm(array[front_num:])]
# Assemble the workers
with ProcessPoolExecutor(max_workers=n_jobs) as pool:
# Pass the elements of array into function
if use_kwargs:
futures = [pool.submit(function, **a) for a in array[front_num:]]
else:
futures = [pool.submit(function, a) for a in array[front_num:]]
kwargs = {
'total': len(futures),
'unit': 'it',
'unit_scale': True,
'leave': True
}
# Print out the progress as tasks complete
for _ in tqdm(as_completed(futures), **kwargs):
pass
out = []
# Get the results from the futures.
for _, future in tqdm(enumerate(futures)):
try:
out.append(future.result())
except Exception as e:
out.append(e)
return front + out