-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcartctl.py
253 lines (221 loc) · 8.21 KB
/
cartctl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#!/usr/bin/env python3
"""
Cart Controller in a factory
- fixed busy control
"""
import enum
import sched
import cart
import jarvisenv
# Main scheduler
s = sched.scheduler(jarvisenv.time, jarvisenv.sleep)
def reset_scheduler():
"""resets scheduler"""
global s
del s
jarvisenv.reset_time()
s = sched.scheduler(jarvisenv.time, jarvisenv.sleep)
return s
class Status(enum.Enum):
"""Controller status"""
Normal = 0 # normal behaviour
UnloadOnly = 1 # must unload only
Idle = 2 # controller do not know what to do at the moment
class CartCtl:
"""Cart controller"""
def __init__(self, cart_device, tracks):
self.cart = cart_device
self.tracks = tracks
if self.cart.pos is None and self.tracks:
self.cart.pos = list(self.tracks.stations())[0]
self.requests = []
self.inprogress = []
self.plan = []
self.status = Status.Idle
self.busy = False
self.only_unload = False
def request(self, new_load: cart.Load):
"""enqueue a new request for transfer"""
self.requests.append(new_load)
new_load.born = jarvisenv.time()
if self.status == Status.Idle:
s.enter(0, 0, self.heartbeat)
def sched_unload(self, slot):
"""schedule the unload"""
# should not be constrained
# from now, after 2 tics, perform unload
assert (not self.busy)
self.busy = True
self.cart.start_unloading(slot)
s.enter(2, 0, self.perform_unload)
def perform_unload(self):
"""proceed unloading current slot"""
self.cart.finish_unloading()
s.enter(0, 0, self.heartbeat)
self.busy = False
def try_unload_here_single(self):
"""unload what is possible"""
for slot in range(len(self.cart.slots)):
load = self.cart.slots[slot]
if load is None:
continue
if load.dst == self.cart.pos:
self.sched_unload(slot)
return slot
return -1
def sched_load(self, slot, request_idx):
"""schedule the load"""
assert (not self.busy)
self.busy = True
self.cart.start_loading(self.requests[request_idx], slot)
s.enter(2, 0, self.perform_load)
def perform_load(self):
"""proceed loading some load from current pos"""
load = self.cart.finish_loading()
self.requests.remove(load)
s.enter(0, 0, self.heartbeat)
self.busy = False
def try_load_here_single(self):
"""load here to a single slot, if there is something here"""
free_slot = self.cart.get_free_slot()
if free_slot == -1:
# no free slot
return -1
free_cap = self.cart.load_capacity - self.cart.load_sum()
if free_cap <= 0:
# no free capacity
return -1
for request_idx in range(len(self.requests)):
request = self.requests[request_idx]
if request.src == self.cart.pos and request.weight <= free_cap:
if (self.status == Status.UnloadOnly and request.prio) or \
(self.status != Status.UnloadOnly and not request.prio):
self.sched_load(free_slot, request_idx)
return free_slot
return -1
def sched_move(self, track):
"""schedule a move"""
assert (not self.busy)
self.busy = True
self.cart.start_moving(track.dst)
s.enter(track.cost, 0, self.perform_move)
def perform_move(self):
"""move has been done"""
self.cart.finish_moving()
s.enter(0, 0, self.heartbeat)
self.busy = False
def find_load_there_single(self, priority=False):
"""
Finds a single slot and requests that fits to together.
Returns a pair of free_slot_idx and request_idx, or (-1,-1) otherwise.
"""
free_slot = self.cart.get_free_slot()
if free_slot == -1:
# no free slot
return -1, -1
free_cap = self.cart.load_capacity - self.cart.load_sum()
if free_cap <= 0:
# no free capacity
return -1, -1
for request_idx in range(len(self.requests)):
request = self.requests[request_idx]
if request.weight <= free_cap:
if priority:
if self.status == Status.UnloadOnly and request.prio:
return free_slot, request_idx
else:
if (self.status == Status.UnloadOnly and request.prio) or \
(self.status != Status.UnloadOnly and not request.prio):
return free_slot, request_idx
return -1, -1
def sort_requests(self):
"""sorts requests wrt. priority"""
self.requests.sort(key=lambda i: (1 - i.prio) * 10 ** 6 + i.born, reverse=True)
def update_prio_requests(self):
"""update prio in each of requests if it waits too long"""
curr_time = jarvisenv.time()
for load in self.requests:
if curr_time - load.born >= 60:
load.set_priority()
self.sort_requests()
def find_prio_request(self):
"""return prioritized Load waiting in requests, or None."""
for load in self.requests:
if load.prio:
return load
return None
def heartbeat(self):
"""main controlling loop"""
# something else is already planned?
if self.busy:
return
# update environment
self.update_prio_requests()
# check environment
load = self.find_prio_request()
if load:
self.status = Status.UnloadOnly
else:
self.status = Status.Normal
slot = self.try_unload_here_single()
if slot != -1:
# should be scheduled for unloading
return
if self.status != Status.UnloadOnly:
slot = self.try_load_here_single()
if slot != -1:
# should be scheduled for loading
return
# nothing for load or unload at this station
# if just a single slot is available, and in prio -> 1 step of move
(free_slot, request_idx) = self.find_load_there_single(True)
if free_slot >= 0:
# found somewhere, let's go there!
request = self.requests[request_idx]
path = self.tracks.get_path(self.cart.pos, request.src)
if path:
self.sched_move(path[0])
return
# try move with load to destination
if not self.cart.empty():
paths = self.evaluate_all_paths()
fast_candidate_idx = CartCtl.find_fastest_slot(paths)
assert fast_candidate_idx is not None
first_track = paths[fast_candidate_idx][0]
self.sched_move(first_track)
return
# try move for a load to source
(free_slot, request_idx) = self.find_load_there_single(False)
if free_slot >= 0:
# found somewhere, let's go there!
request = self.requests[request_idx]
path = self.tracks.get_path(self.cart.pos, request.src)
if path:
self.sched_move(path[0])
return
# print('Do not know what to do at time %s' % jarvisenv.time())
self.status = Status.Idle
def evaluate_all_paths(self):
"""finds all paths for all cargo"""
paths = [self.tracks.get_path(self.cart.pos, l.dst) if l else None
for l in self.cart.slots]
return paths
@staticmethod
def eval_cost(path):
"""calculate the cost of the path"""
sum_cost = 0
for track in path:
sum_cost += track.cost
return sum_cost
@staticmethod
def find_fastest_slot(paths):
"""finds index of the occupied slot with shortest destination"""
costs = [CartCtl.eval_cost(path) if path else None for path in paths]
minidx = None
for idx in range(len(costs)):
if costs[idx] is not None:
if minidx is None:
minidx = idx
elif costs[idx] < costs[minidx]:
minidx = idx
return minidx