-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparallel_runner.py
75 lines (63 loc) · 2.63 KB
/
parallel_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# parallel_runner.py
from concurrent.futures import ThreadPoolExecutor, as_completed
from api_client import query_vllm
from tqdm import tqdm
from collections import defaultdict
import itertools
import threading
import logging
import psutil
import time
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
def log_system_resources():
cpu_usage = psutil.cpu_percent()
memory_info = psutil.virtual_memory()
active_threads = threading.active_count()
logging.debug(f"Active threads: {active_threads} / Total available threads: {threading.active_count()}")
logging.debug(f"CPU Usage: {cpu_usage}%")
logging.debug(f"Memory Usage: {memory_info.percent}%")
def periodic_logging(interval):
while True:
log_system_resources()
time.sleep(interval)
def run_tests_in_parallel(endpoints, payload, iterations, concurrency, thread_multiplier):
results = []
progress_bars = {}
endpoint_progress = defaultdict(lambda: 0)
futures_to_endpoint = {}
# Calculate the required number of threads
num_threads = len(endpoints) * concurrency * thread_multiplier
with ThreadPoolExecutor(max_workers=num_threads) as executor:
# Start the periodic logging thread
logging_thread = threading.Thread(target=periodic_logging, args=(5,))
logging_thread.daemon = True
logging_thread.start()
# Create a combined iterable of all tasks for all endpoints
tasks = list(itertools.chain.from_iterable(
[(endpoint, query_vllm, payload) for _ in range(iterations) for _ in range(concurrency)]
for endpoint in endpoints
))
# Submit the tasks to the executor
futures = {executor.submit(query_vllm, endpoint, payload): endpoint for endpoint, query_vllm, payload in tasks}
# Create a tqdm progress bar for each endpoint
for endpoint in endpoints:
progress_bars[endpoint] = tqdm(total=iterations * concurrency, desc=f"{endpoint}", unit="request")
# Process the futures as they complete
for future in as_completed(futures):
endpoint = futures[future]
try:
result = future.result()
result["endpoint"] = endpoint
results.append(result)
except Exception as e:
result = {
"endpoint": endpoint,
"error": str(e)
}
results.append(result)
progress_bars[endpoint].update(1)
# Close all progress bars
for bar in progress_bars.values():
bar.close()
return results