Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/4639 export vehicle flow assignment #501

Merged
merged 18 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions OTAnalytics/adapter_ui/view_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,7 @@ def get_skip_frames(self) -> int:
@abstractmethod
def set_video_control_frame(self, frame: AbstractFrame) -> None:
raise NotImplementedError

@abstractmethod
def export_road_user_assignments(self) -> None:
raise NotImplementedError
15 changes: 12 additions & 3 deletions OTAnalytics/application/analysis/traffic_counting.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,15 @@ class RoadUserAssignments:
Represents a group of RoadUserAssignment objects.
"""

@property
def road_user_ids(self) -> list[str]:
"""Returns a sorted list of all road user ids within this group of assignments.

Returns:
list[str]: the road user ids.
"""
return sorted([assignment.road_user for assignment in self._assignments])

def __init__(self, assignments: list[RoadUserAssignment]) -> None:
self._assignments = assignments.copy()

Expand Down Expand Up @@ -610,9 +619,9 @@ def __group_flows_by_sections(
dict[tuple[SectionId, SectionId], list[Flow]]: flows grouped by start and
end section
"""
flows_by_start_and_end: dict[
tuple[SectionId, SectionId], list[Flow]
] = defaultdict(list)
flows_by_start_and_end: dict[tuple[SectionId, SectionId], list[Flow]] = (
defaultdict(list)
)
for current in flows:
flows_by_start_and_end[(current.start, current.end)].append(current)
return flows_by_start_and_end
Expand Down
14 changes: 14 additions & 0 deletions OTAnalytics/application/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
from OTAnalytics.application.use_cases.quick_save_configuration import (
QuickSaveConfiguration,
)
from OTAnalytics.application.use_cases.road_user_assignment_export import (
ExportRoadUserAssignments,
ExportSpecification,
)
from OTAnalytics.application.use_cases.save_otflow import SaveOtflow
from OTAnalytics.application.use_cases.section_repository import (
AddSection,
Expand Down Expand Up @@ -123,6 +127,7 @@ def __init__(
quick_save_configuration: QuickSaveConfiguration,
load_otconfig: LoadOtconfig,
config_has_changed: ConfigHasChanged,
export_road_user_assignments: ExportRoadUserAssignments,
) -> None:
self._datastore: Datastore = datastore
self.track_state: TrackState = track_state
Expand Down Expand Up @@ -161,6 +166,7 @@ def __init__(
self._quick_save_configuration = quick_save_configuration
self._load_otconfig = load_otconfig
self._config_has_changed = config_has_changed
self._export_road_user_assignments = export_road_user_assignments

def connect_observers(self) -> None:
"""
Expand Down Expand Up @@ -622,6 +628,14 @@ def quick_save_configuration(self) -> None:
def config_has_changed(self) -> bool:
return self._config_has_changed.has_changed()

def export_road_user_assignments(self, specification: ExportSpecification) -> None:
self._export_road_user_assignments.export(specification)

def get_road_user_export_formats(
self,
) -> Iterable[ExportFormat]:
return self._export_road_user_assignments.get_supported_formats()


class MissingTracksError(Exception):
pass
Empty file.
23 changes: 23 additions & 0 deletions OTAnalytics/application/export_formats/event_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from OTAnalytics.domain import event

ROAD_USER_ID = event.ROAD_USER_ID
ROAD_USER_TYPE = event.ROAD_USER_TYPE
HOSTNAME = event.HOSTNAME
OCCURRENCE = event.OCCURRENCE
OCCURRENCE_DATE = "occurrence_day"
OCCURRENCE_TIME = "occurrence_time"
FRAME_NUMBER = event.FRAME_NUMBER
SECTION_ID = event.SECTION_ID
SECTION_NAME = "section_name"
EVENT_COORDINATE = event.EVENT_COORDINATE
EVENT_COORDINATE_X = "coordinate_px_x"
EVENT_COORDINATE_Y = "coordinate_px_y"
EVENT_TYPE = event.EVENT_TYPE
DIRECTION_VECTOR = event.DIRECTION_VECTOR
DIRECTION_VECTOR_X = "vector_px_x"
DIRECTION_VECTOR_Y = "vector_px_y"
VIDEO_NAME = event.VIDEO_NAME

DATE_FORMAT = "%Y-%m-%d"
TIME_FORMAT = "%H:%M:%S.%f"
DATE_TIME_FORMAT = f"{DATE_FORMAT} {TIME_FORMAT}"
46 changes: 46 additions & 0 deletions OTAnalytics/application/export_formats/road_user_assignments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from OTAnalytics.application.export_formats import event_list

START_PREFIX = "start"
END_PREFIX = "end"


def _prepend_start(key: str) -> str:
return f"{START_PREFIX}_{key}"


def _prepend_end(key: str) -> str:
return f"{END_PREFIX}_{key}"


FLOW_ID = "flow_id"
FLOW_NAME = "flow_name"
ROAD_USER_ID = event_list.ROAD_USER_ID
ROAD_USER_TYPE = event_list.ROAD_USER_TYPE
MAX_CONFIDENCE = "max_confidence"
START_OCCURRENCE = _prepend_start(event_list.OCCURRENCE)
END_OCCURRENCE = _prepend_end(event_list.OCCURRENCE)
START_OCCURRENCE_DATE = _prepend_start(event_list.OCCURRENCE_DATE)
END_OCCURRENCE_DATE = _prepend_end(event_list.OCCURRENCE_DATE)
START_OCCURRENCE_TIME = _prepend_start(event_list.OCCURRENCE_TIME)
END_OCCURRENCE_TIME = _prepend_end(event_list.OCCURRENCE_TIME)
START_FRAME = _prepend_start(event_list.FRAME_NUMBER)
END_FRAME = _prepend_end(event_list.FRAME_NUMBER)
START_VIDEO_NAME = _prepend_start(event_list.VIDEO_NAME)
END_VIDEO_NAME = _prepend_end(event_list.VIDEO_NAME)
START_SECTION_ID = _prepend_start(event_list.SECTION_ID)
END_SECTION_ID = _prepend_end(event_list.SECTION_ID)
START_SECTION_NAME = _prepend_start(event_list.SECTION_NAME)
END_SECTION_NAME = _prepend_end(event_list.SECTION_NAME)
START_EVENT_COORDINATE_X = _prepend_start(event_list.EVENT_COORDINATE_X)
START_EVENT_COORDINATE_Y = _prepend_start(event_list.EVENT_COORDINATE_Y)
END_EVENT_COORDINATE_X = _prepend_end(event_list.EVENT_COORDINATE_X)
END_EVENT_COORDINATE_Y = _prepend_end(event_list.EVENT_COORDINATE_Y)
START_DIRECTION_VECTOR_X = _prepend_start(event_list.DIRECTION_VECTOR_X)
START_DIRECTION_VECTOR_Y = _prepend_start(event_list.DIRECTION_VECTOR_Y)
END_DIRECTION_VECTOR_X = _prepend_end(event_list.DIRECTION_VECTOR_X)
END_DIRECTION_VECTOR_Y = _prepend_end(event_list.DIRECTION_VECTOR_Y)
HOSTNAME = event_list.HOSTNAME

DATE_FORMAT = event_list.DATE_FORMAT
TIME_FORMAT = event_list.TIME_FORMAT
DATE_TIME_FORMAT = event_list.DATE_TIME_FORMAT
224 changes: 224 additions & 0 deletions OTAnalytics/application/use_cases/road_user_assignment_export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Iterable, Protocol, Self

from OTAnalytics.application.analysis.traffic_counting import (
RoadUserAssigner,
RoadUserAssignment,
RoadUserAssignments,
)
from OTAnalytics.application.analysis.traffic_counting_specification import ExportFormat
from OTAnalytics.application.export_formats import road_user_assignments as ras
from OTAnalytics.application.use_cases.create_events import CreateEvents
from OTAnalytics.application.use_cases.track_repository import GetAllTracks
from OTAnalytics.domain.event import EventRepository
from OTAnalytics.domain.flow import FlowRepository
from OTAnalytics.domain.section import Section, SectionId, SectionRepository

MaxConfidenceLookupTable = dict[str, float]
MaxConfidenceProvider = Callable[[list[str]], MaxConfidenceLookupTable]


class RoadUserAssignmentBuildError(Exception):
pass


class RoadUserAssignmentBuilder:
def __init__(self) -> None:
self._start_section: Section | None = None
self._end_section: Section | None = None
self._max_confidence: float | None = None

def add_start_section(self, start: Section) -> Self:
self._start_section = start
return self

def add_end_section(self, end: Section) -> Self:
self._end_section = end
return self

def add_max_confidence(self, max_confidence: float) -> Self:
self._max_confidence = max_confidence
return self

def build(self, assignment: RoadUserAssignment) -> dict:
result = self.__create(assignment)
self.reset()
return result

def __create(self, assignment: RoadUserAssignment) -> dict:
if self._start_section is None:
raise RoadUserAssignmentBuildError("Start section not set")
if self._end_section is None:
raise RoadUserAssignmentBuildError("End section not set")
if self._max_confidence is None:
raise RoadUserAssignmentBuildError("Max confidence not set")
assigned_flow = assignment.assignment
start = assignment.events.start
end = assignment.events.end
return {
ras.FLOW_ID: assigned_flow.id.id,
ras.FLOW_NAME: assigned_flow.name,
ras.ROAD_USER_ID: assignment.road_user,
ras.MAX_CONFIDENCE: self._max_confidence,
ras.START_OCCURRENCE: start.occurrence.strftime(ras.DATE_TIME_FORMAT),
ras.START_OCCURRENCE_DATE: start.occurrence.strftime(ras.DATE_FORMAT),
ras.START_OCCURRENCE_TIME: start.occurrence.strftime(ras.TIME_FORMAT),
ras.END_OCCURRENCE: end.occurrence.strftime(ras.DATE_TIME_FORMAT),
ras.END_OCCURRENCE_DATE: end.occurrence.strftime(ras.DATE_FORMAT),
ras.END_OCCURRENCE_TIME: end.occurrence.strftime(ras.TIME_FORMAT),
ras.START_FRAME: start.frame_number,
ras.END_FRAME: end.frame_number,
ras.START_VIDEO_NAME: start.video_name,
ras.END_VIDEO_NAME: end.video_name,
ras.START_SECTION_ID: self._start_section.id.id,
ras.END_SECTION_ID: self._end_section.id.id,
ras.START_SECTION_NAME: self._start_section.name,
ras.END_SECTION_NAME: self._end_section.name,
ras.START_EVENT_COORDINATE_X: start.event_coordinate.x,
ras.START_EVENT_COORDINATE_Y: start.event_coordinate.y,
ras.END_EVENT_COORDINATE_X: end.event_coordinate.x,
ras.END_EVENT_COORDINATE_Y: end.event_coordinate.y,
ras.START_DIRECTION_VECTOR_X: start.direction_vector.x1,
ras.START_DIRECTION_VECTOR_Y: start.direction_vector.x2,
ras.END_DIRECTION_VECTOR_X: end.direction_vector.x1,
ras.END_DIRECTION_VECTOR_Y: end.direction_vector.x2,
ras.HOSTNAME: start.hostname,
}

def reset(self) -> None:
self._start_section = None
self._end_section = None
self._max_confidence = None


class RoadUserAssignmentExportError(Exception):
pass


class RoadUserAssignmentExporter(ABC):
@property
@abstractmethod
def format(self) -> ExportFormat:
raise NotImplementedError

def __init__(
self,
section_repository: SectionRepository,
get_all_tracks: GetAllTracks,
builder: RoadUserAssignmentBuilder,
output_file: Path,
) -> None:
self._section_repository = section_repository
self._get_all_tracks = get_all_tracks
self._builder = builder
self._outputfile = output_file

def export(self, assignments: RoadUserAssignments) -> None:
dtos = self._convert(assignments)
self._serialize(dtos)

@abstractmethod
def _serialize(self, dtos: list[dict]) -> None:
"""Hook for implementations to serialize in their respective save format.

Args:
dtos (list[dict]): the vehicle flow assignments as dtos.
"""
raise NotImplementedError

def _convert(self, assignments: RoadUserAssignments) -> list[dict]:
vehicle_flow_assignments = []
look_up_table = self._get_max_conf_lookup_table_for(assignments)
for assignment in assignments.as_list():
start_section = self._get_section_by_id(assignment.assignment.start)
end_section = self._get_section_by_id(assignment.assignment.end)
max_confidence = look_up_table[assignment.road_user]
vehicle_flow_assignments.append(
self._builder.add_start_section(start_section)
.add_end_section(end_section)
.add_max_confidence(max_confidence)
.build(assignment)
)
return vehicle_flow_assignments

def _get_max_conf_lookup_table_for(
self, assignments: RoadUserAssignments
) -> MaxConfidenceLookupTable:
return self._get_all_tracks.as_dataset().get_max_confidences_for(
assignments.road_user_ids
)

def _get_section_by_id(self, section_id: SectionId) -> Section:
result = self._section_repository.get(section_id)
if not result:
raise RoadUserAssignmentExportError(
f"No section found with id '{section_id.id}'"
)
return result


@dataclass(frozen=True)
class ExportSpecification:
save_path: Path
format: str


class RoadUserAssignmentExporterFactory(Protocol):
def get_supported_formats(self) -> Iterable[ExportFormat]:
"""
Returns an iterable of the supported export formats.

Returns:
Iterable[ExportFormat]: supported export formats.
"""
...

def create(self, specification: ExportSpecification) -> RoadUserAssignmentExporter:
"""
Create the exporter for the given road user assignment export specification.

Args:
specification (ExportSpecification): specification of the Exporter.

Returns:
RoadUserAssignmentExporter: Exporter to export road user assignments.
"""
...


class ExportRoadUserAssignments:
"""Use case to export_formats vehicle flow assignments."""

def __init__(
self,
event_repository: EventRepository,
flow_repository: FlowRepository,
create_events: CreateEvents,
assigner: RoadUserAssigner,
exporter_factory: RoadUserAssignmentExporterFactory,
) -> None:
self._event_repository = event_repository
self._flow_repository = flow_repository
self._create_events = create_events
self._assigner = assigner
self._exporter_factory = exporter_factory

def export(self, specification: ExportSpecification) -> None:
if self._event_repository.is_empty():
self._create_events()
events = self._event_repository.get_all()
flows = self._flow_repository.get_all()
road_user_assignments = self._assigner.assign(events, flows)
exporter = self._exporter_factory.create(specification)
exporter.export(road_user_assignments)

def get_supported_formats(self) -> Iterable[ExportFormat]:
"""
Returns an iterable of the supported export formats.

Returns:
Iterable[ExportFormat]: supported export formats
"""
return self._exporter_factory.get_supported_formats()
Loading