Skip to content

Commit

Permalink
WIP: Build common interface for waypoint geometry constraints.
Browse files Browse the repository at this point in the history
  • Loading branch information
DanAlbert committed Aug 3, 2023
1 parent f376e6d commit 7ea686b
Show file tree
Hide file tree
Showing 4 changed files with 721 additions and 0 deletions.
123 changes: 123 additions & 0 deletions game/flightplan/waypointsolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from __future__ import annotations

import json
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING, Any

from dcs import Point
from dcs.mapping import Point as DcsPoint
from dcs.terrain import Terrain
from numpy import float64, array
from numpy._typing import NDArray
from shapely import transform, to_geojson
from shapely.geometry.base import BaseGeometry

if TYPE_CHECKING:
from .waypointstrategy import WaypointStrategy


class NoSolutionsError(RuntimeError):
pass


class WaypointSolver:
def __init__(self) -> None:
self.strategies: list[WaypointStrategy] = []
self.debug_output_directory: Path | None = None
self._terrain: Terrain | None = None

def add_strategy(self, strategy: WaypointStrategy) -> None:
self.strategies.append(strategy)

def set_debug_properties(self, path: Path, terrain: Terrain) -> None:
self.debug_output_directory = path
self._terrain = terrain

def to_geojson(self, geometry: BaseGeometry) -> dict[str, Any]:
if geometry.is_empty:
return json.loads(to_geojson(geometry))

assert self._terrain is not None
origin = DcsPoint(0, 0, self._terrain)

def xy_to_ll(points: NDArray[float64]) -> NDArray[float64]:
ll_points = []
for point in points:
p = origin.new_in_same_map(point[0], point[1])
latlng = p.latlng()
ll_points.append([latlng.lat, latlng.lng])
return array(ll_points)

transformed = transform(geometry, xy_to_ll)
return json.loads(to_geojson(transformed))

def describe_metadata(self) -> dict[str, Any]:
return {}

def describe_inputs(self) -> Iterator[tuple[str, BaseGeometry]]:
yield from []

def describe_debug(self) -> dict[str, Any]:
assert self._terrain is not None
metadata = {"name": self.__class__.__name__, "terrain": self._terrain.name}
metadata.update(self.describe_metadata())
return {
"metadata": metadata,
"geojson": {
"type": "FeatureCollection",
"features": list(self.describe_features()),
},
}

def describe_features(self) -> Iterator[dict[str, Any]]:
for description, geometry in self.describe_inputs():
yield {
"type": "Feature",
"properties": {
"description": description,
},
"geometry": self.to_geojson(geometry),
}

def dump_debug_info(self) -> None:
path = self.debug_output_directory
if path is None:
return

inputs_path = path / "solver.json"
with inputs_path.open("w", encoding="utf-8") as inputs_file:
json.dump(self.describe_debug(), inputs_file)

for idx, strategy in enumerate(self.strategies):
strategy_path = path / f"{idx}.json"
with strategy_path.open("w", encoding="utf-8") as strategy_debug_file:
json.dump(
{
"strategy_name": strategy.__class__.__name__,
"geojson": {
"type": "FeatureCollection",
"features": [
d.to_geojson(self.to_geojson)
for d in strategy.iter_debug_info()
],
},
},
strategy_debug_file,
)

def solve(self) -> Point:
if not self.strategies:
raise ValueError(
"WaypointSolver.solve() called before any strategies were added"
)

for strategy in self.strategies:
if (point := strategy.find()) is not None:
return point

self.dump_debug_info()
debug_details = "No debug output directory set"
if (debug_path := self.debug_output_directory) is not None:
debug_details = f"Debug details written to {debug_path}"
raise NoSolutionsError(f"No solutions found for waypoint. {debug_details}")
236 changes: 236 additions & 0 deletions game/flightplan/waypointstrategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
from __future__ import annotations

import math
from abc import abstractmethod, ABC
from collections.abc import Iterator, Callable
from dataclasses import dataclass
from typing import Any

from dcs.mapping import heading_between_points
from shapely.geometry import Point, MultiPolygon, Polygon
from shapely.geometry.base import BaseGeometry as Geometry, BaseGeometry
from shapely.ops import nearest_points

from game.utils import Distance, nautical_miles, Heading


def angle_between_points(a: Point, b: Point) -> float:
return heading_between_points(a.x, a.y, b.x, b.y)


def point_at_heading(p: Point, heading: Heading, distance: Distance) -> Point:
rad_heading = heading.radians
return Point(
p.x + math.cos(rad_heading) * distance.meters,
p.y + math.sin(rad_heading) * distance.meters,
)


class Prerequisite(ABC):
@abstractmethod
def is_satisfied(self) -> bool:
...


class DistancePrerequisite(Prerequisite):
def __init__(self, a: Point, b: Point, min_range: Distance) -> None:
self.a = a
self.b = b
self.min_range = min_range

def is_satisfied(self) -> bool:
return self.a.distance(self.b) >= self.min_range.meters


class SafePrerequisite(Prerequisite):
def __init__(self, point: Point, threat_zones: MultiPolygon) -> None:
self.point = point
self.threat_zones = threat_zones

def is_satisfied(self) -> bool:
return not self.point.intersects(self.threat_zones)


class PrerequisiteBuilder:
def __init__(
self, subject: Point, threat_zones: MultiPolygon, strategy: WaypointStrategy
) -> None:
self.subject = subject
self.threat_zones = threat_zones
self.strategy = strategy

def is_safe(self) -> None:
self.strategy.add_prerequisite(
SafePrerequisite(self.subject, self.threat_zones)
)

def min_distance_from(self, target: Point, distance: Distance) -> None:
self.strategy.add_prerequisite(
DistancePrerequisite(self.subject, target, distance)
)


@dataclass(frozen=True)
class ThreatTolerance:
target: Point
target_buffer: Distance
tolerance: Distance


class RequirementBuilder:
def __init__(self, threat_zones: MultiPolygon, strategy: WaypointStrategy) -> None:
self.threat_zones = threat_zones
self.strategy = strategy

def safe(self) -> None:
self.strategy.exclude_threat_zone()

def at_least(self, distance: Distance) -> DistanceRequirementBuilder:
return DistanceRequirementBuilder(self.strategy, min_distance=distance)

def at_most(self, distance: Distance) -> DistanceRequirementBuilder:
return DistanceRequirementBuilder(self.strategy, max_distance=distance)

def maximum_turn_to(
self, turn_point: Point, next_point: Point, turn_limit: Heading
) -> None:

large_distance = nautical_miles(400)
next_heading = Heading.from_degrees(
angle_between_points(next_point, turn_point)
)
limit_ccw = point_at_heading(
turn_point, next_heading - turn_limit, large_distance
)
limit_cw = point_at_heading(
turn_point, next_heading + turn_limit, large_distance
)

allowed_wedge = Polygon([turn_point, limit_ccw, limit_cw])
self.strategy.exclude(
f"restrict turn from {turn_point} to {next_point} to {turn_limit}",
turn_point.buffer(large_distance.meters).difference(allowed_wedge),
)


class DistanceRequirementBuilder:
def __init__(
self,
strategy: WaypointStrategy,
min_distance: Distance | None = None,
max_distance: Distance | None = None,
) -> None:
if min_distance is None and max_distance is None:
raise ValueError
self.strategy = strategy
self.min_distance = min_distance
self.max_distance = max_distance

def away_from(self, target: Point) -> None:
if self.min_distance is not None:
self.strategy.exclude(
f"at least {self.min_distance} away from {target}",
target.buffer(self.min_distance.meters),
)
if self.max_distance is not None:
self.strategy.exclude_beyond(
f"at most {self.min_distance} away from {target}",
target.buffer(self.max_distance.meters),
)


@dataclass(frozen=True)
class WaypointDebugInfo:
description: str
geometry: BaseGeometry

def to_geojson(
self, to_geojson: Callable[[BaseGeometry], dict[str, Any]]
) -> dict[str, Any]:
return {
"type": "Feature",
"properties": {
"description": self.description,
},
"geometry": to_geojson(self.geometry),
}


class WaypointStrategy:
def __init__(self, threat_zones: MultiPolygon) -> None:
self.threat_zones = threat_zones
self.prerequisites: list[Prerequisite] = []
self.allowed_area: Polygon = Point(0, 0).buffer(1_000_000)
self.debug_infos: list[WaypointDebugInfo] = []
self._threat_tolerance: ThreatTolerance | None = None
self.point_for_nearest_solution: Point | None = None

def add_prerequisite(self, prerequisite: Prerequisite) -> None:
self.prerequisites.append(prerequisite)

def prerequisite(self, subject: Point) -> PrerequisiteBuilder:
return PrerequisiteBuilder(subject, self.threat_zones, self)

def exclude(self, description: str, geometry: Geometry) -> None:
self.debug_infos.append(WaypointDebugInfo(description, geometry))
self.allowed_area = self.allowed_area.difference(geometry)

def exclude_beyond(self, description: str, geometry: Geometry) -> None:
self.exclude(description, self.allowed_area.difference(geometry))

def exclude_threat_zone(self) -> None:
if (tolerance := self._threat_tolerance) is not None:
description = (
f"safe with a {tolerance.tolerance} tolerance to a "
f"{tolerance.target_buffer} radius about {tolerance.target}"
)
else:
description = "safe"
self.exclude(description, self.threat_zones)

def prerequisites_are_satisfied(self) -> bool:
for prereq in self.prerequisites:
if not prereq.is_satisfied():
return False
return True

def require(self) -> RequirementBuilder:
return RequirementBuilder(self.threat_zones, self)

def threat_tolerance(
self, target: Point, target_size: Distance, wiggle: Distance
) -> None:
if self.threat_zones.is_empty:
return

min_distance_from_threat_to_target_buffer = target.buffer(
target_size.meters
).distance(self.threat_zones.boundary)
threat_mask = self.threat_zones.buffer(
-min_distance_from_threat_to_target_buffer - wiggle.meters
)
self._threat_tolerance = ThreatTolerance(target, target_size, wiggle)
self.threat_zones = self.threat_zones.difference(threat_mask)

def nearest(self, point: Point) -> None:
if self.point_for_nearest_solution is not None:
raise RuntimeError("WaypointStrategy.nearest() called more than once")
self.point_for_nearest_solution = point

def find(self) -> Point | None:
if self.point_for_nearest_solution is None:
raise RuntimeError(
"Must call WaypointStrategy.nearest() before WaypointStrategy.find()"
)

if not self.prerequisites_are_satisfied():
return None

try:
return nearest_points(self.allowed_area, self.point_for_nearest_solution)[0]
except ValueError:
# No solutions.
return None

def iter_debug_info(self) -> Iterator[WaypointDebugInfo]:
yield from self.debug_infos
Loading

0 comments on commit 7ea686b

Please sign in to comment.