-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsolution.py
398 lines (319 loc) · 12.9 KB
/
solution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
# Look for #IMPLEMENT tags in this file. These tags indicate what has
# to be implemented to complete the warehouse domain.
# You may add only standard python imports---i.e., ones that are automatically
# available on TEACH.CS
# You may not remove any imports.
# You may not import or otherwise source any of your own files
import os #for time functions
import sys
from search import * #for search engines
from search import _BEST_FIRST, _CUSTOM
from sokoban import SokobanState, Direction, PROBLEMS #for Sokoban specific classes and problems
from collections import deque
import hungarian as h
# states by boxes
UNVISITED=0
VISITED=1
BAD=2
GOOD=3
SKETCH=4
bs_dist_cache = None
box_cache = None
def sokoban_goal_state(state):
'''
@return: Whether all boxes are stored.
'''
for box in state.boxes:
if box not in state.storage:
return False
return True
def heur_manhattan_distance(state):
#IMPLEMENT
'''admissible sokoban puzzle heuristic: manhattan distance'''
'''INPUT: a sokoban state'''
'''OUTPUT: a numeric value that serves as an estimate of the distance of the state to the goal.'''
#We want an admissible heuristic, which is an optimistic heuristic.
#It must never overestimate the cost to get from the current state to the goal.
#The sum of the Manhattan distances between each box that has yet to be stored and the storage point nearest to it is such a heuristic.
#When calculating distances, assume there are no obstacles on the grid.
#You should implement this heuristic function exactly, even if it is tempting to improve it.
#Your function should return a numeric value; this is the estimate of the distance to the goal.
return find_total_dist(state.boxes, state.storage)
# Manhattan
def dist(b, s):
return sum( [ abs(s[i]-b[i]) for i in [0,1] ] )
def find_total_dist(boxes, storage):
total = 0
if not boxes: return 0
if not storage: return sys.max_value
for b in boxes:
m = None
for s in storage:
if not m or dist(b,s) < m:
m = dist(b,s)
total += m
return total
#SOKOBAN HEURISTICS
def trivial_heuristic(state):
'''trivial admissible sokoban heuristic'''
'''INPUT: a sokoban state'''
'''OUTPUT: a numeric value that serves as an estimate of the distance of the state (# of moves required to get) to the goal.'''
count = 0
for box in state.boxes:
if box not in state.storage:
count += 1
return count
def heur_alternate(state):
#IMPLEMENT
'''a better heuristic'''
'''INPUT: a sokoban state'''
'''OUTPUT: a numeric value that serves as an estimate of the distance of the state to the goal.'''
#heur_manhattan_distance has flaws.
#Write a heuristic function that improves upon heur_manhattan_distance to estimate distance between the current state and the goal.
#Your function should return a numeric value for the estimate of the distance to the goal.
global box_cache
global bs_dist_cache
if not box_cache or state.action == "START":
box_cache = state.boxes
if box_cache == state.boxes and bs_dist_cache:
if bs_dist_cache == sys.maxsize: return bs_dist_cache
return bs_dist_cache + min_rb_not_s_dist(state)
box_cache = state.boxes
# if boxes are cornered, state is not valid
if not valid(state): return sys.maxsize
# use hungarian algorithm to find min distance of box to storage WHERE NO TWO BOX SHARES SAME STORAGE
D = h.build_dist_table(state.boxes, state.storage)
bs = h.Hungarian(D)
bs_dist, bs_pairs = bs.compute()
bs_dist_cache = bs_dist
return bs_dist + min_rb_not_s_dist(state)
def min_rb_not_s_dist(state):
not_stored_boxes = state.boxes.difference(state.storage)
min_dist = None
for r in state.robots:
for b in not_stored_boxes:
if not min_dist or dist(r,b) < min_dist:
min_dist = dist(r,b)
if not min_dist: return 0
return min_dist
# A DFS flood-fill algorithm to search if box is blocked
def valid(state):
visiting = {}
blocked = {}
for box in state.boxes:
visiting[box] = False
blocked[box] = UNVISITED
for box in state.boxes:
if is_blocked(state, box, visiting, blocked)==BAD: return False
for box in state.boxes:
if blocked[box] == SKETCH:
if not reachable(state,box):
return False
return True
# DFS approach to find robot
def reachable(state,box):
n = get_neighbors(state,box)
new_obstacles = set(state.obstacles)
for l in n:
if is_box(state,l):
new_obstacles.add(l)
visited = set()
#bfs loop based
dq =deque()
dq.append(box)
while dq:
node = dq.popleft()
if node in state.robots: return True
for nei in get_neighbors(state,node):
if nei not in visited and nei not in new_obstacles:
dq.append(nei)
visited.add(node)
return False
def get_neighbors(state,box):
neighbors = []
if box[1]-1 >= 0:
neighbors.append( (box[0],box[1]-1) )
if box[1]+1 < state.height:
neighbors.append( (box[0],box[1]+1) )
if box[0]+1 < state.width:
neighbors.append( (box[0]+1,box[1]) )
if box[0]-1 >= 0:
neighbors.append( (box[0]-1,box[1]) )
return neighbors
def is_valid_location(state,location):
if location[0] < 0 or location[0] >= state.width: return False
if location[1] < 0 or location[1] >= state.height: return False
for obstacles in state.obstacles:
if location == obstacles: return False
return True
# DP approach to if box is blocked
def is_blocked(state,box,visiting,blocked):
if is_storage(state,box): return GOOD
if blocked[box] != UNVISITED: return blocked[box]
if visiting[box]:
# deadlock
blocked[box] = BAD
return blocked[box]
n = (box[0],box[1]-1)
s = (box[0],box[1]+1)
e = (box[0]+1,box[1])
w = (box[0]-1,box[1])
visiting[box] = True
# depth first backtracking search
for loc1,loc2 in [ (n,e), (e,s), (s,w), (w,n) ]:
#if is_valid_location(state,loc1) or is_valid_location(state,loc2): continue
if not is_box(state,loc1) and not is_box(state,loc2):
if not is_valid_location(state,loc1) and not is_valid_location(state,loc2):
blocked[box]=BAD
return blocked[box]
elif is_box(state,loc1) and not is_box(state,loc2):
if not is_valid_location(state,loc2):
t = is_blocked(state,loc1,visiting,blocked)
if t == VISITED: blocked[loc1] = SKETCH # make sure loc1 is reachable by robot
if t == BAD:
blocked[box]=BAD
return blocked[box]
elif is_box(state,loc2) and not is_box(state,loc1):
if not is_valid_location(state,loc1):
t = is_blocked(state,loc2,visiting,blocked)
if t == VISITED: blocked[loc2] = SKETCH
elif t == BAD:
blocked[box]=BAD
return blocked[box]
# both are boxes
else:
t1 = is_blocked(state, loc1, visiting,blocked)
t2 = is_blocked(state,loc2,visiting,blocked)
if t1 == BAD and t2 == BAD:
blocked[box]=BAD
return blocked[box]
# backtrack
visiting[box] = False
# 1 is not blocked
blocked[box]=VISITED
return blocked[box]
def is_box(state,location):
for box in state.boxes:
if box == location: return True
return False
def is_storage(state,box):
for storage in state.storage:
if box == storage: return True
return False
def heur_zero(state):
'''Zero Heuristic can be used to make A* search perform uniform cost search'''
return 0
def fval_function(sN, weight):
#IMPLEMENT
"""
Provide a custom formula for f-value computation for Anytime Weighted A star.
Returns the fval of the state contained in the sNode.
@param sNode sN: A search node (containing a SokobanState)
@param float weight: Weight given by Anytime Weighted A star
@rtype: float
"""
#Many searches will explore nodes (or states) that are ordered by their f-value.
#For UCS, the fvalue is the same as the gval of the state. For best-first search, the fvalue is the hval of the state.
#You can use this function to create an alternate f-value for states; this must be a function of the state and the weight.
#The function must return a numeric f-value.
#The value will determine your state's position on the Frontier list during a 'custom' search.
#You must initialize your search engine object as a 'custom' search engine if you supply a custom fval function.
return sN.gval + weight * sN.hval
def anytime_weighted_astar(initial_state, heur_fn, weight=1., timebound = 10):
#IMPLEMENT
'''Provides an implementation of anytime weighted a-star, as described in the HW1 handout'''
'''INPUT: a sokoban state that represents the start state and a timebound (number of seconds)'''
'''OUTPUT: A goal state (if a goal is found), else False'''
'''implementation of weighted astar algorithm'''
w = 20
wrapped_fval_function = ( lambda sN: fval_function(sN, w) )
frontier = Open(_CUSTOM)
node = sNode(initial_state, heur_fn(initial_state), wrapped_fval_function)
stop_time = os.times()[0] + timebound
# for cycle checking
ht = dict()
ht[initial_state.hashable_state()] = initial_state.gval
frontier.insert(node)
best_node, best_fval = None, None
while not frontier.empty():
node = frontier.extract()
if sokoban_goal_state(node.state):
# update best_fval
if not best_fval or node.fval_function(node) < best_fval:
best_fval = node.fval_function(node)
best_node = node
# prune this state if hval == infinity
if node.hval == sys.maxsize: continue
if os.times()[0] > stop_time:
if not best_node:
print("TRACE: Search has exceeeded the time bound provided.")
print("Final weight: ", w)
return None
print("Times up. Returning best node")
print("Final weight: ", w)
return best_node.state
if ht[node.state.hashable_state()] < node.gval: continue
successors = node.state.successors()
# decrease weight
if w-0.0001 > 1: w -= 0.0001
wrapped_fval_function = (lambda sN: fval_function(sN, w))
for succ in successors:
hash_state = succ.hashable_state()
hval = heur_fn(succ)
succ_node = sNode(succ, hval, wrapped_fval_function)
# prune if gval > best_gval
if best_fval and succ_node.fval_function(succ_node) > best_fval: continue
#
if hash_state in ht and succ.gval > ht[hash_state]: continue
# path checking
if succ.has_path_cycle(): continue
ht[hash_state] = succ.gval
# pass heur_fn as input
frontier.insert(succ_node)
if not best_node: return None
return best_node.state
def anytime_gbfs(initial_state, heur_fn, timebound = 10):
#IMPLEMENT
'''Provides an implementation of anytime greedy best-first search, as described in the HW1 handout'''
'''INPUT: a sokoban state that represents the start state and a timebound (number of seconds)'''
'''OUTPUT: A goal state (if a goal is found), else False'''
'''implementation of weighted astar algorithm'''
frontier = Open(_BEST_FIRST)
node = sNode(initial_state, heur_fn(initial_state), None)
stop_time = os.times()[0] + timebound
# for cycle checking
ht = dict()
ht[initial_state.hashable_state()] = initial_state.gval
frontier.insert(node)
best_node, best_gval = None, None
while not frontier.empty():
node = frontier.extract()
if sokoban_goal_state(node.state):
# update best_gval
if not best_gval or node.gval < best_gval:
best_gval = node.gval
best_node = node
# prune this state if hval == infinity
if node.hval == sys.maxsize: continue
if os.times()[0] > stop_time:
if not best_node:
#print("TRACE: Search has exceeeded the time bound provided.")
return None
#print("Times up. Returning best node")
return best_node.state
if ht[node.state.hashable_state()] < node.gval: continue
successors = node.state.successors()
for succ in successors:
hash_state = succ.hashable_state()
# prune if gval > best_gval
if best_gval and succ.gval > best_gval: continue
#
if hash_state in ht and succ.gval > ht[hash_state]: continue
# path checking
if succ.has_path_cycle(): continue
ht[hash_state] = succ.gval
hval = heur_fn(succ)
# pass heur_fn as input
frontier.insert(sNode(succ, hval, None))
if not best_node: return None
return best_node.state