Skip to content

Commit

Permalink
Merge pull request #380 from OpenTrafficCam/feature/3266-calculate-ge…
Browse files Browse the repository at this point in the history
…ometry-once-per-used-offset-group-section-by-offset-before-creating-intersection-objects

Feature/3266 calculate geometry once per used offset group section by offset before creating intersection objects
  • Loading branch information
randy-seng authored Nov 9, 2023
2 parents 4f6b3e1 + 0482560 commit 9659788
Show file tree
Hide file tree
Showing 36 changed files with 1,900 additions and 2,073 deletions.
68 changes: 1 addition & 67 deletions OTAnalytics/application/eventlist.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,10 @@
from typing import Iterable

from OTAnalytics.domain.event import (
Event,
EventType,
SceneEventBuilder,
SectionEventBuilder,
)
from OTAnalytics.domain.event import Event, EventType, SceneEventBuilder
from OTAnalytics.domain.geometry import calculate_direction_vector
from OTAnalytics.domain.intersect import Intersector
from OTAnalytics.domain.section import Section
from OTAnalytics.domain.track import Track


class SectionActionDetector:
"""Detect when a track enters or leaves a section and generate events.
A track enters or leaves a section when they intersect.
Args:
intersector (Intersector): the intersector
section_event_builder (SectionEventBuilder): the section event builder
"""

def __init__(
self, intersector: Intersector, section_event_builder: SectionEventBuilder
) -> None:
self.intersector = intersector
self.section_event_builder = section_event_builder

def detect(
self,
sections: list[Section],
tracks: list[Track],
) -> list[Event]:
"""Detect section events.
Args:
sections (list[Section]): the sections
tracks (list[Track]): the tracks
Returns:
list[Event]: the events if tracks intersect with any of the sections.
Otherwise return empty list.
"""
event_list: list[Event] = []
for section in sections:
for track in tracks:
enter_event = self._detect(section, track)
if enter_event:
event_list.extend(enter_event)

return event_list

def _detect(self, section: Section, track: Track) -> list[Event]:
"""Detect when a track enters a section.
Args:
sections (Section): the section
track (Track): the track
Returns:
list[Event]: the event if a track enters a section.
Otherwise return empty list.
"""
self.section_event_builder.add_section_id(section.id)
self.section_event_builder.add_event_type(EventType.SECTION_ENTER)
events: list[Event] = self.intersector.intersect(
track, event_builder=self.section_event_builder
)
return events


class SceneActionDetector:
"""Detect when a road user enters or leaves the scene.
Expand Down
23 changes: 23 additions & 0 deletions OTAnalytics/application/geometry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from abc import ABC, abstractmethod
from functools import singledispatchmethod
from typing import Generic, Iterable, TypeVar

from OTAnalytics.domain.geometry import (
Coordinate,
Line,
Expand Down Expand Up @@ -26,3 +30,22 @@ def build(self, track: Track, offset: RelativeOffsetCoordinate) -> Line:
for detection in track.detections
]
return Line(coordinates)


LINE = TypeVar("LINE")
AREA = TypeVar("AREA")


class GeometryBuilder(ABC, Generic[LINE, AREA]):
@singledispatchmethod
@abstractmethod
def create_section(self) -> LINE | AREA:
raise NotImplementedError

@abstractmethod
def create_track(self, track: Track, offset: RelativeOffsetCoordinate) -> LINE:
raise NotImplementedError

@abstractmethod
def create_line_segments(self, geometry: LINE) -> Iterable[LINE]:
raise NotImplementedError
25 changes: 24 additions & 1 deletion OTAnalytics/application/use_cases/track_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from OTAnalytics.domain.track import (
RemoveMultipleTracksError,
Track,
TrackDataset,
TrackFileRepository,
TrackId,
TrackRepository,
Expand Down Expand Up @@ -41,8 +42,14 @@ def __call__(self) -> list[Track]:
Returns:
list[Track]: tracks with at least two detections.
"""
return self.as_list()

def as_list(self) -> list[Track]:
return self.as_dataset().as_list()

def as_dataset(self) -> TrackDataset:
tracks = self._track_repository.get_all()
return [track for track in tracks.as_list() if len(track.detections) > 1]
return tracks.filter_by_min_detection_length(1)


class GetAllTrackIds:
Expand Down Expand Up @@ -147,3 +154,19 @@ def __call__(self, track_ids: Iterable[TrackId]) -> Iterable[Track]:
tracks.append(track)

return tracks


class GetTracksAsBatches:
def __init__(self, track_repository: TrackRepository) -> None:
self._track_repository: TrackRepository = track_repository

def get(self, batches: int) -> Iterable[TrackDataset]:
"""Get tracks in the repository as batches.
Args:
batches (int): the number of batches.
Returns:
Iterable[TrackDataset]: the batches of tracks.
"""
return self._track_repository.split(batches)
19 changes: 19 additions & 0 deletions OTAnalytics/domain/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,22 @@ def calculate_direction_vector(
DirectionVector2D: the two dimensional direction vector
"""
return DirectionVector2D(x1=y1 - x1, x2=y2 - x2)


def apply_offset(
x: float, y: float, w: float, h: float, offset: RelativeOffsetCoordinate
) -> tuple[float, float]:
"""Apply offset to coordinates.
Args:
x (float): x coordinate.
y (float): y coordinate.
w (float): width.
h (float): height.
offset (RelativeOffsetCoordinate): the offset to include in the selection
of the coordinate.
Returns:
tuple[float, float]: the coordinate with the offset applied.
"""
return x + w * offset.x, y + h * offset.y
125 changes: 20 additions & 105 deletions OTAnalytics/domain/intersect.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
from abc import ABC, abstractmethod
from typing import Callable, Generic, Iterable, TypeVar
from typing import Callable, Iterable, Sequence

from OTAnalytics.domain.event import Event, EventBuilder
from OTAnalytics.domain.geometry import (
Coordinate,
DirectionVector2D,
Line,
Polygon,
RelativeOffsetCoordinate,
calculate_direction_vector,
)
from OTAnalytics.domain.section import Area, LineSection, Section
from OTAnalytics.domain.track import Detection, Track
from OTAnalytics.domain.geometry import Coordinate, Line, Polygon
from OTAnalytics.domain.section import Section
from OTAnalytics.domain.track import Track


class IntersectImplementation(ABC):
Expand Down Expand Up @@ -100,21 +93,24 @@ def are_coordinates_within_polygon(


class IntersectParallelizationStrategy(ABC):
@property
@abstractmethod
def num_processes(self) -> int:
raise NotImplementedError

@abstractmethod
def execute(
self,
intersect: Callable[[Track, Iterable[Section]], Iterable[Event]],
tracks: Iterable[Track],
sections: Iterable[Section],
intersect: Callable[[Iterable[Track], Iterable[Section]], Iterable[Event]],
tasks: Sequence[tuple[Iterable[Track], Iterable[Section]]],
) -> list[Event]:
"""Executes the intersection of tracks with sections with the implemented
parallelization strategy.
Args:
intersect (Callable[[Track, Iterable[Section]], Iterable[Event]]): the
function to be executed on an iterable of tracks and sections.
tracks (Iterable[Track]): the tracks to be processed.
sections (Iterable[Section]): the sections to be processed.
tasks (tuple[Iterable[Track], Iterable[Section])
Returns:
Iterable[Event]: the generated events.
Expand All @@ -131,106 +127,25 @@ def set_num_processes(self, value: int) -> None:
raise NotImplementedError


T = TypeVar("T")


class Intersector(ABC, Generic[T]):
class Intersector(ABC):
"""
Defines an interface to implement a family of algorithms to intersect tracks
with sections.
Args:
implementation (IntersectImplementation): the intersection implementation
"""

@abstractmethod
def __init__(self, implementation: IntersectImplementation) -> None:
self.implementation = implementation

@abstractmethod
def intersect(self, track: T, event_builder: EventBuilder) -> list[Event]:
def intersect(
self, tracks: Iterable[Track], section: Section, event_builder: EventBuilder
) -> list[Event]:
"""Intersect tracks with sections and generate events if they intersect.
Args:
track (T): the track
tracks (Iterable[Track]): the tracks to be intersected with.
section (Section): the section to be intersected with.
event_builder (EventBuilder): builder to generate events
Returns:
list[Event]: the events if the track intersects with the section.
Otherwise return empty list.
Otherwise, return empty list.
"""
pass

@staticmethod
def _select_coordinate_in_detection(
detection: Detection, offset: RelativeOffsetCoordinate
) -> Coordinate:
"""Select a coordinate within the bounding box of a detection.
A coordinate within the bounding box of a detection is selected by applying the
offset.
Args:
detection (Detection): the detection containing the bounding box dimensions
offset (RelativeOffsetCoordinate): the offset to include in the selection
of the coordinate
Returns:
Coordinate: the coordinate
"""
return Coordinate(
x=detection.x + detection.w * offset.x,
y=detection.y + detection.h * offset.y,
)

@staticmethod
def _calculate_direction_vector(
first: Coordinate, second: Coordinate
) -> DirectionVector2D:
"""Calculate direction vector from two coordinates.
Args:
first (Coordinate): the first coordinate
second (Coordinate): the second coordinate
Returns:
DirectionVector2D: the direction vector
"""
result = calculate_direction_vector(first.x, first.y, second.x, second.y)
return result


class LineSectionIntersector(Intersector[T]):
"""Determines whether a line section intersects with a track.
Args:
implementation (IntersectorImplementation): the intersection implementation
line (LineSection): the line to intersect with
"""

@abstractmethod
def __init__(
self,
implementation: IntersectImplementation,
line_section: LineSection,
) -> None:
super().__init__(implementation)
self._line_section = line_section


class AreaIntersector(Intersector[T]):
"""Determines whether an area intersects with a track.
Args:
implementation (IntersectorImplementation): the intersection implementation.
area (Area): the area to intersect with.
detection (Detection): the detection to intersect with.
"""

def __init__(
self,
implementation: IntersectImplementation,
area: Area,
) -> None:
super().__init__(implementation)
self._area = area
raise NotImplementedError
Loading

0 comments on commit 9659788

Please sign in to comment.