forked from xiph/rd_tool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.py
50 lines (47 loc) · 1.7 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/usr/bin/env python3
from utility import get_time
import threading
from time import sleep
import sys
def run(work_items, slots):
retries = 0
max_retries = 5000
free_slots = slots
taken_slots = []
work_done = []
total_num_of_jobs = len(work_items)
while(1):
for slot in taken_slots:
if slot.busy == False:
if slot.work.failed == False:
work_done.append(slot.work)
print(get_time(),len(work_done),'out of',total_num_of_jobs,'finished.')
elif retries >= max_retries:
break
else:
retries = retries + 1
print(get_time(),'Retrying work...',retries,'of',max_retries,'retries.')
work_items.append(slot.work)
taken_slots.remove(slot)
free_slots.append(slot)
#have we finished all the work?
if len(work_items) == 0:
if len(taken_slots) == 0:
print(get_time(),'All work finished.')
break
elif retries >= max_retries:
print(get_time(),'Max number of failed retries reached!')
sys.exit(1)
else:
if len(free_slots) != 0:
slot = free_slots.pop()
work = work_items.pop()
slot.work = work
print(get_time(),'Encoding',work.get_name(),'on',slot.machine.host)
work_thread = threading.Thread(target=slot.execute, args=(work,))
work_thread.daemon = True
slot.busy = True
work_thread.start()
taken_slots.append(slot)
sleep(0.2)
return work_done