Skip to content

Commit

Permalink
WIP: Build common interface for waypoint geometry constraints.
Browse files Browse the repository at this point in the history
  • Loading branch information
DanAlbert committed Aug 2, 2023
1 parent a0ab46a commit 55ba330
Show file tree
Hide file tree
Showing 4 changed files with 584 additions and 0 deletions.
59 changes: 59 additions & 0 deletions game/flightplan/waypointsolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from __future__ import annotations

import json
from pathlib import Path
from typing import TYPE_CHECKING

from dcs import Point

if TYPE_CHECKING:
from .waypointstrategy import WaypointStrategy


class WaypointSolver:
def __init__(self) -> None:
self.strategies: list[WaypointStrategy] = []
self.debug_output_directory: Path | None = None

def add_strategy(self, strategy: WaypointStrategy) -> None:
self.strategies.append(strategy)

def set_debug_output_directory(self, path: Path) -> None:
self.debug_output_directory = path

def dump_debug_info(self) -> None:
path = self.debug_output_directory
if path is None:
return

for idx, strategy in enumerate(self.strategies):
strategy_path = path / f"{idx}.json"
with strategy_path.open("w", encoding="utf-8") as strategy_debug_file:
json.dump(
{
"strategy_name": strategy.__class__.__name__,
"geojson": {
"type": "FeatureCollection",
"features": [
d.to_geojson() for d in strategy.iter_debug_info()
],
},
},
strategy_debug_file,
)

def solve(self) -> Point:
if not self.strategies:
raise ValueError(
"WaypointSolver.solve() called before any strategies were added"
)

for strategy in self.strategies:
if (point := strategy.find()) is not None:
return point

self.dump_debug_info()
debug_details = "No debug output directory set"
if (debug_path := self.debug_output_directory) is not None:
debug_details = f"Debug details written to {debug_path}"
raise RuntimeError(f"No solutions found for waypoint. {debug_details}")
235 changes: 235 additions & 0 deletions game/flightplan/waypointstrategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
from __future__ import annotations

import math
from abc import abstractmethod, ABC
from collections.abc import Iterator
from dataclasses import dataclass
from typing import Any

from dcs.mapping import heading_between_points
from shapely import to_geojson
from shapely.geometry import Point, MultiPolygon, Polygon
from shapely.geometry.base import BaseGeometry as Geometry
from shapely.ops import nearest_points

from game.utils import Distance, nautical_miles, Heading


def angle_between_points(a: Point, b: Point) -> float:
return heading_between_points(a.x, a.y, b.x, b.y)


def point_at_heading(p: Point, heading: Heading, distance: Distance) -> Point:
rad_heading = heading.radians
return Point(
p.x + math.cos(rad_heading) * distance.meters,
p.y + math.sin(rad_heading) * distance.meters,
)


class Prerequisite(ABC):
@abstractmethod
def is_satisfied(self) -> bool:
...


class DistancePrerequisite(Prerequisite):
def __init__(self, a: Point, b: Point, min_range: Distance) -> None:
self.a = a
self.b = b
self.min_range = min_range

def is_satisfied(self) -> bool:
return self.a.distance(self.b) >= self.min_range.meters


class SafePrerequisite(Prerequisite):
def __init__(self, point: Point, threat_zones: MultiPolygon) -> None:
self.point = point
self.threat_zones = threat_zones

def is_satisfied(self) -> bool:
return not self.point.intersects(self.threat_zones)


class PrerequisiteBuilder:
def __init__(
self, subject: Point, threat_zones: MultiPolygon, strategy: WaypointStrategy
) -> None:
self.subject = subject
self.threat_zones = threat_zones
self.strategy = strategy

def safe(self) -> None:
self.strategy.add_prerequisite(
SafePrerequisite(self.subject, self.threat_zones)
)

def min_distance_from(self, target: Point, distance: Distance) -> None:
self.strategy.add_prerequisite(
DistancePrerequisite(self.subject, target, distance)
)


@dataclass(frozen=True)
class ThreatTolerance:
target: Point
target_buffer: Distance
tolerance: Distance


class RequirementBuilder:
def __init__(self, threat_zones: MultiPolygon, strategy: WaypointStrategy) -> None:
self.threat_zones = threat_zones
self.strategy = strategy

def safe(self) -> None:
self.strategy.exclude_threat_zone()

def at_least(self, distance: Distance) -> DistanceRequirementBuilder:
return DistanceRequirementBuilder(self.strategy, min_distance=distance)

def at_most(self, distance: Distance) -> DistanceRequirementBuilder:
return DistanceRequirementBuilder(self.strategy, max_distance=distance)

def maximum_turn_to(
self, turn_point: Point, next_point: Point, turn_limit: Heading
) -> None:

large_distance = nautical_miles(400)
next_heading = Heading.from_degrees(
angle_between_points(next_point, turn_point)
)
limit_ccw = point_at_heading(
turn_point, next_heading - turn_limit, large_distance
)
limit_cw = point_at_heading(
turn_point, next_heading + turn_limit, large_distance
)

allowed_wedge = Polygon([turn_point, limit_ccw, limit_cw])
self.strategy.exclude(
f"restrict turn from {turn_point} to {next_point} to {turn_limit}",
turn_point.buffer(large_distance.meters).difference(allowed_wedge),
)


class DistanceRequirementBuilder:
def __init__(
self,
strategy: WaypointStrategy,
min_distance: Distance | None = None,
max_distance: Distance | None = None,
) -> None:
if min_distance is None and max_distance is None:
raise ValueError
self.strategy = strategy
self.min_distance = min_distance
self.max_distance = max_distance

def away_from(self, target: Point) -> None:
if self.min_distance is not None:
self.strategy.exclude(
f"at least {self.min_distance} away from {target}",
target.buffer(self.min_distance.meters),
)
if self.max_distance is not None:
self.strategy.exclude_beyond(
f"at most {self.min_distance} away from {target}",
target.buffer(self.max_distance.meters),
)


@dataclass(frozen=True)
class WaypointDebugInfo:
description: str
geometry: Geometry

def to_geojson(self) -> dict[str, Any]:
return {
"type": "Feature",
"properties": {
"description": self.description,
},
"geometry": to_geojson(self.geometry),
}


class WaypointStrategy:
def __init__(self, threat_zones: MultiPolygon) -> None:
self.threat_zones = threat_zones
self.prerequisites: list[Prerequisite] = []
self.allowed_area: Polygon = Point(0, 0).buffer(1_000_000)
self.debug_infos: list[WaypointDebugInfo] = []
self._threat_tolerance: ThreatTolerance | None = None
self.point_for_nearest_solution: Point | None = None

def add_prerequisite(self, prerequisite: Prerequisite) -> None:
self.prerequisites.append(prerequisite)

def prerequisite(self, subject: Point) -> PrerequisiteBuilder:
return PrerequisiteBuilder(subject, self.threat_zones, self)

def exclude(self, description: str, geometry: Geometry) -> None:
self.debug_infos.append(WaypointDebugInfo(description, geometry))
self.allowed_area = self.allowed_area.difference(geometry)

def exclude_beyond(self, description: str, geometry: Geometry) -> None:
self.exclude(description, self.allowed_area.difference(geometry))

def exclude_threat_zone(self) -> None:
if (tolerance := self._threat_tolerance) is not None:
description = (
f"safe with a {tolerance.tolerance} tolerance to a "
f"{tolerance.target_buffer} radius about {tolerance.target}"
)
else:
description = "safe"
self.exclude(description, self.threat_zones)

def prerequisites_are_satisfied(self) -> bool:
for prereq in self.prerequisites:
if not prereq.is_satisfied():
return False
return True

def require(self) -> RequirementBuilder:
return RequirementBuilder(self.threat_zones, self)

def threat_tolerance(
self, target: Point, target_size: Distance, wiggle: Distance
) -> None:
if self.threat_zones.is_empty:
return

min_distance_from_threat_to_target_buffer = target.buffer(
target_size.meters
).distance(self.threat_zones.boundary)
threat_mask = self.threat_zones.buffer(
-min_distance_from_threat_to_target_buffer - wiggle.meters
)
self._threat_tolerance = ThreatTolerance(target, target_size, wiggle)
self.threat_zones = self.threat_zones.difference(threat_mask)

def nearest(self, point: Point) -> None:
if self.point_for_nearest_solution is not None:
raise RuntimeError("WaypointStrategy.nearest() called more than once")
self.point_for_nearest_solution = point

def find(self) -> Point | None:
if self.point_for_nearest_solution is None:
raise RuntimeError(
"Must call WaypointStrategy.nearest() before WaypointStrategy.find()"
)

if not self.prerequisites_are_satisfied():
return None

try:
return nearest_points(self.allowed_area, self.point_for_nearest_solution)[0]
except ValueError:
# No solutions.
return None

def iter_debug_info(self) -> Iterator[WaypointDebugInfo]:
yield from self.debug_infos
Loading

0 comments on commit 55ba330

Please sign in to comment.