Skip to content

Commit

Permalink
Fixed minor bugs and added yml on zip
Browse files Browse the repository at this point in the history
  • Loading branch information
fpiedrah committed Oct 12, 2024
1 parent 62eb429 commit 1e6dd60
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 28 deletions.
Binary file modified dataset-config.zip
Binary file not shown.
117 changes: 89 additions & 28 deletions dataset_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,7 @@ def mutate(string: str) -> str:
return "The goal is for the rover to transmit data from all objectives, each captured with a different camera mode, and to relay data from all soil and rock samples to the lander. Additionally, the rover must be positioned at the same waypoint as the lander."

def grid_navigate(
self,
waypoints,
objectives,
stores,
Expand Down Expand Up @@ -1696,6 +1697,7 @@ def grid_navigate(
rover_waypoint = waypoints[rover_position[0] * grid_size_y + rover_position[1]]
predicates.append(Predicate("at_rover", rover_waypoint))

lander_position = kwargs.get("lander_position")
lander_waypoint = waypoints[lander_position[0] * grid_size_y + lander_position[1]]
predicates.append(Predicate("at_lander", lander_waypoint))

Expand All @@ -1705,6 +1707,7 @@ def grid_navigate(
return predicates

def line_navigate(
self,
waypoints,
objectives,
stores,
Expand Down Expand Up @@ -1744,6 +1747,7 @@ def line_navigate(
return predicates

def circle_navigate(
self,
waypoints,
objectives,
stores,
Expand Down Expand Up @@ -1783,6 +1787,7 @@ def circle_navigate(
return predicates

def star_navigate(
self,
waypoints,
objectives,
stores,
Expand Down Expand Up @@ -1817,6 +1822,7 @@ def star_navigate(
return predicates

def partitioned_navigate(
self,
waypoints,
objectives,
stores,
Expand Down Expand Up @@ -1844,11 +1850,15 @@ def partitioned_navigate(
predicates.append(Predicate("visible", waypoint, waypoint))

rover_position = kwargs.get("rover_position")
rover_waypoint = waypoints[rover_position]
side, pos = rover_position
waypoint_idx = pos if side == 0 else num_l + pos
rover_waypoint = waypoints[waypoint_idx]
predicates.append(Predicate("at_rover", rover_waypoint))

lander_position = kwargs.get("lander_position")
lander_waypoint = waypoints[lander_position]
side, pos = lander_position
waypoint_idx = pos if side == 0 else num_l + pos
lander_waypoint = waypoints[waypoint_idx]
predicates.append(Predicate("at_lander", lander_waypoint))

predicates.append(Predicate("channel_free"))
Expand All @@ -1857,6 +1867,7 @@ def partitioned_navigate(
return predicates

def _set_capacities(
self,
stores,
cameras,
modes,
Expand All @@ -1873,6 +1884,7 @@ def _set_capacities(
return predicates

def grid_analysis(
self,
waypoints,
objectives,
stores,
Expand All @@ -1895,7 +1907,7 @@ def grid_analysis(

grid_size_x, grid_size_y = kwargs.get("grid_size")

objective_positions = kwargs.get("objective_positions")
objective_positions = kwargs.get("objective_visible_positions")
for obj, (x, y) in zip(objectives, objective_positions):
waypoint_idx = x * grid_size_y + y
waypoint = waypoints[waypoint_idx]
Expand All @@ -1917,7 +1929,8 @@ def grid_analysis(

return predicates

def _linear_set(
def _linear_set(
self,
waypoints,
objectives,
stores,
Expand All @@ -1929,7 +1942,7 @@ def _linear_set(
):
predicates = []

objective_positions = kwargs.get("objective_positions")
objective_positions = kwargs.get("objective_visible_positions")
for obj, waypoint_idx in zip(objectives, objective_positions):
waypoint = waypoints[waypoint_idx]
predicates.append(Predicate("visible_from", obj, waypoint))
Expand All @@ -1947,6 +1960,7 @@ def _linear_set(
return predicates

def line_analysis(
self,
waypoints,
objectives,
stores,
Expand Down Expand Up @@ -1985,6 +1999,7 @@ def line_analysis(
return predicates

def circle_analysis(
self,
waypoints,
objectives,
stores,
Expand Down Expand Up @@ -2023,6 +2038,7 @@ def circle_analysis(
return predicates

def star_analysis(
self,
waypoints,
objectives,
stores,
Expand Down Expand Up @@ -2061,6 +2077,7 @@ def star_analysis(
return predicates

def partitioned_analysis(
self,
waypoints,
objectives,
stores,
Expand All @@ -2081,7 +2098,7 @@ def partitioned_analysis(
**kwargs
)

objective_positions = kwargs.get("objective_positions")
objective_positions = kwargs.get("objective_visible_positions")
for obj, (side, pos) in zip(objectives, objective_positions):
waypoint_idx = pos if side == 0 else num_l + pos
waypoint = waypoints[waypoint_idx]
Expand All @@ -2103,7 +2120,25 @@ def partitioned_analysis(

return predicates

def _get_position(self, position, n_waypoints, initial_state, **kwargs):
if "grid" in initial_state:
x, y = position
waypoint_idx = x * kwargs.get("grid_size")[1] + y
elif "line" in initial_state:
waypoint_idx = position
elif "circle" in initial_state:
waypoint_idx = position % n_waypoints
elif "star" in initial_state:
waypoint_idx = position if position == 0 else position - 1
elif "partitioned" in initial_state:
side, pos = position
num_l = kwargs.get("partition_sizes")[0]
waypoint_idx = pos if side == 0 else num_l + pos

return waypoint_idx

def rover_in_lander(
self,
waypoints,
objectives,
stores,
Expand All @@ -2116,30 +2151,17 @@ def rover_in_lander(
predicates = []

lander_position = kwargs.get("lander_position")
lander_waypoint = waypoints[lander_position]
waypoint_idx = self._get_position(
lander_position, len(waypoints), init_task, **kwargs
)
lander_waypoint = waypoints[waypoint_idx]
predicates.append(Predicate("at_lander", lander_waypoint))
predicates.append(Predicate("at_rover", lander_waypoint))

return predicates

def _get_position(position, n_waypoints, initial_state, **kwargs):
if initial_state == "grid":
x, y = position
waypoint_idx = x * kwargs.get("grid_size")[1] + y
elif initial_state == "line":
waypoint_idx = position
elif initial_state == "circle":
waypoint_idx = position % n_waypoints
elif initial_state == "star":
waypoint_idx = position if position == 0 else position + 1
elif initial_state == "partitioned":
side, pos = position
num_l = kwargs.get("partition_sizes")[0]
waypoint_idx = pos if side == 0 else num_l + pos

return waypoint_idx

def transmit_rocks(
self,
waypoints,
objectives,
stores,
Expand All @@ -2150,6 +2172,10 @@ def transmit_rocks(
**kwargs
):
predicates = []

if not kwargs.get("rock_positions"):
return predicates

rock_positions = kwargs.get("rock_positions")

for position in rock_positions:
Expand All @@ -2162,6 +2188,7 @@ def transmit_rocks(
return predicates

def transmit_soil(
self,
waypoints,
objectives,
stores,
Expand All @@ -2172,9 +2199,13 @@ def transmit_soil(
**kwargs
):
predicates = []
rock_positions = kwargs.get("soil_positions")

for position in rock_positions:
if not kwargs.get("rock_positions"):
return predicates

soil_positions = kwargs.get("soil_positions")

for position in soil_positions:
waypoint_idx = self._get_position(
position, len(waypoints), init_task, **kwargs
)
Expand All @@ -2184,6 +2215,7 @@ def transmit_soil(
return predicates

def transmit_objectives(
self,
waypoints,
objectives,
stores,
Expand All @@ -2195,13 +2227,17 @@ def transmit_objectives(
):
predicates = []

if not objectives:
return predicates

for obj in objectives:
for mode in modes:
predicates.append(Predicate("communicated_image_data", obj, mode))

return predicates

def transmit_all(
self,
waypoints,
objectives,
stores,
Expand Down Expand Up @@ -2233,6 +2269,7 @@ def transmit_all(
return predicates

def transmit_all_and_in_lander(
self,
waypoints,
objectives,
stores,
Expand Down Expand Up @@ -2274,7 +2311,7 @@ def get_task(
objectives = [Constant(f"objective{i + 1}", type_tag="objective") for i in range(n_objectives)]
stores = [Constant(f"store{i + 1}", type_tag="store") for i in range(n_stores)]
cameras = [Constant(f"cameras{i + 1}", type_tag="camera") for i in range(n_stores)]
modes = [Constant(f"mode{i + 1}", type_tag="mode") for i in range]
modes = [Constant(f"mode{i + 1}", type_tag="mode") for i in range(n_modes)]
constants = waypoints + objectives + stores + cameras + modes

init_predicates = getattr(self, init)(
Expand Down Expand Up @@ -2313,12 +2350,24 @@ def get_task(
init,
is_init=True,
n_waypoints=n_waypoints,
n_objectives=n_objectives,
n_rocks=n_rocks,
n_soil=n_soil,
n_cameras=n_cameras,
n_modes=n_modes,
n_stores=n_stores,
**kwargs,
),
"explicit": self.explicit_description(
init_predicates,
is_init=True,
n_waypoints=n_waypoints,
n_objectives=n_objectives,
n_rocks=n_rocks,
n_soil=n_soil,
n_cameras=n_cameras,
n_modes=n_modes,
n_stores=n_stores,
randomize=randomize,
),
},
Expand All @@ -2327,12 +2376,24 @@ def get_task(
goal,
is_init=False,
n_waypoints=n_waypoints,
n_objectives=n_objectives,
n_rocks=n_rocks,
n_soil=n_soil,
n_cameras=n_cameras,
n_modes=n_modes,
n_stores=n_stores,
**kwargs,
),
"explicit": self.explicit_description(
goal_predicates,
is_init=False,
n_waypoints=n_waypoints,
n_objectives=n_objectives,
n_rocks=n_rocks,
n_soil=n_soil,
n_cameras=n_cameras,
n_modes=n_modes,
n_stores=n_stores,
randomize=randomize,
),
},
Expand Down Expand Up @@ -2755,7 +2816,7 @@ def get_robot_grid_string(
num_cols,
kwargs.get("robot_data"),
)
return f"You have {n_robots} robots, {n_colors} colors, and {n_tiles}unpainted tiles arranged in a grid with {num_rows} rows and {num_cols} columns.{grid_string}All colors are available."
return f"You have {n_robots} robots, {n_colors} colors, and {n_tiles} unpainted tiles arranged in a grid with {num_rows} rows and {num_cols} columns.{grid_string}All colors are available."

case ("rings", True):
grid_size_x, grid_size_y = kwargs.get("grid_size")
Expand Down

0 comments on commit 1e6dd60

Please sign in to comment.