Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug/4915-moving-to-the-next-video-using-the-date-range-filter-or-the-video-control-does-not-work #506

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 9 additions & 48 deletions OTAnalytics/application/datastore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterable, Optional, Sequence, Tuple

Expand Down Expand Up @@ -29,58 +28,19 @@
TrackListObserver,
TrackRepository,
)
from OTAnalytics.domain.video import Video, VideoListObserver, VideoRepository
from OTAnalytics.domain.video import (
Video,
VideoListObserver,
VideoMetadata,
VideoRepository,
)


@dataclass(frozen=True)
class DetectionMetadata:
detection_classes: frozenset[str]


@dataclass(frozen=True)
class VideoMetadata:
path: str
recorded_start_date: datetime
expected_duration: Optional[timedelta]
recorded_fps: float
actual_fps: Optional[float]
number_of_frames: int

@property
def start(self) -> datetime:
return self.recorded_start_date

@property
def end(self) -> datetime:
return self.start + self.duration

@property
def duration(self) -> timedelta:
if self.expected_duration:
return self.expected_duration
return timedelta(seconds=self.number_of_frames / self.recorded_fps)

@property
def fps(self) -> float:
if self.actual_fps:
return self.actual_fps
return self.recorded_fps

def to_dict(self) -> dict:
return {
"path": self.path,
"recorded_start_date": self.recorded_start_date.timestamp(),
"expected_duration": (
self.expected_duration.total_seconds()
if self.expected_duration
else None
),
"recorded_fps": self.recorded_fps,
"actual_fps": self.actual_fps,
"number_of_frames": self.number_of_frames,
}


@dataclass(frozen=True)
class TrackParseResult:
tracks: TrackDataset
Expand All @@ -104,7 +64,7 @@ def serialize(

class VideoParser(ABC):
@abstractmethod
def parse(self, file: Path, start_date: Optional[datetime]) -> Video:
def parse(self, file: Path, metadata: Optional[VideoMetadata]) -> Video:
pass

@abstractmethod
Expand Down Expand Up @@ -179,14 +139,15 @@ class TrackVideoParser(ABC):

@abstractmethod
def parse(
self, file: Path, track_ids: list[TrackId]
self, file: Path, track_ids: list[TrackId], metadata: VideoMetadata
) -> Tuple[list[TrackId], list[Video]]:
"""
Parse the given file in ottrk format and retrieve video information from it

Args:
file (Path): file in ottrk format
track_ids (list[TrackId]): track ids to get videos for
metadata (VideoMetadata): the video metadata

Returns:
Tuple[list[TrackId], list[Video]]: track ids and the corresponding videos
Expand Down
141 changes: 84 additions & 57 deletions OTAnalytics/application/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Callable, Generic, Optional

from OTAnalytics.application.config import DEFAULT_TRACK_OFFSET
from OTAnalytics.application.datastore import Datastore, VideoMetadata
from OTAnalytics.application.datastore import Datastore
from OTAnalytics.application.playback import SkipTime
from OTAnalytics.application.use_cases.section_repository import GetSectionsById
from OTAnalytics.domain.date import DateRange
Expand All @@ -29,7 +29,7 @@
TrackRepositoryEvent,
TrackSubject,
)
from OTAnalytics.domain.video import Video, VideoListObserver
from OTAnalytics.domain.video import Video, VideoListObserver, VideoMetadata

FIRST_DETECTION_OCCURRENCE: str = "first_detection_occurrence"
LAST_DETECTION_OCCURRENCE: str = "last_detection_occurrence"
Expand Down Expand Up @@ -242,10 +242,70 @@ def plot(self) -> Optional[TrackImage]:
pass


class VideosMetadata:
def __init__(self) -> None:
self._metadata: dict[datetime, VideoMetadata] = {}
self._first_video_start: Optional[datetime] = None
self._last_video_end: Optional[datetime] = None

def update(self, metadata: VideoMetadata) -> None:
"""
Update the stored metadata.
"""
if metadata.start in self._metadata.keys():
raise ValueError(
f"metadata with start date {metadata.start} already exists."
)
self._metadata[metadata.start] = metadata
self._metadata = dict(sorted(self._metadata.items()))
self._update_start_end_by(metadata)

def _update_start_end_by(self, metadata: VideoMetadata) -> None:
if (not self._first_video_start) or metadata.start < self._first_video_start:
self._first_video_start = metadata.start
if (not self._last_video_end) or metadata.end > self._last_video_end:
self._last_video_end = metadata.end

def get_metadata_for(self, current: datetime) -> Optional[VideoMetadata]:
"""
Find the metadata for the given datetime. If the datetime matches exactly a
start time of a video, the corresponding VideoMetadata is returned. Otherwise,
the metadata of the video containing the datetime will be returned.
"""
if current in self._metadata:
return self._metadata[current]
keys = list(self._metadata.keys())
key = bisect.bisect_left(keys, current) - 1
metadata = self._metadata[keys[key]]
if metadata.start <= current <= metadata.end:
return metadata
return None

@property
def first_video_start(self) -> Optional[datetime]:
return self._first_video_start

@property
def last_video_end(self) -> Optional[datetime]:
return self._last_video_end

def to_dict(self) -> dict:
return {
key.timestamp(): metadata.to_dict()
for key, metadata in self._metadata.items()
}


class SelectedVideoUpdate(TrackListObserver, VideoListObserver):
def __init__(self, datastore: Datastore, track_view_state: TrackViewState) -> None:
def __init__(
self,
datastore: Datastore,
track_view_state: TrackViewState,
videos_metadata: VideosMetadata,
) -> None:
self._datastore = datastore
self._track_view_state = track_view_state
self._videos_metadata = videos_metadata

def notify_tracks(self, track_event: TrackRepositoryEvent) -> None:
all_tracks = self._datastore.get_all_tracks()
Expand All @@ -258,6 +318,27 @@ def notify_videos(self, videos: list[Video]) -> None:
if videos:
self._track_view_state.selected_videos.set([videos[0]])

def on_filter_element_change(self, filter_element: FilterElement) -> None:
start_date = filter_element.date_range.start_date
end_date = filter_element.date_range.end_date
video_repository = self._datastore._video_repository

if not start_date and not end_date:
if first_video_start := self._videos_metadata.first_video_start:
first_video = video_repository.get_by_date(first_video_start)
self._track_view_state.selected_videos.set(first_video)
return

if not end_date:
self._track_view_state.selected_videos.set([])
return

if not (videos := video_repository.get_by_date(end_date)):
self._track_view_state.selected_videos.set([])
return

self._track_view_state.selected_videos.set(videos)


class SectionState(SectionListObserver):
"""
Expand Down Expand Up @@ -419,60 +500,6 @@ def _update_image(self) -> None:
self._track_view_state.background_image.set(self._plotter.plot())


class VideosMetadata:
def __init__(self) -> None:
self._metadata: dict[datetime, VideoMetadata] = {}
self._first_video_start: Optional[datetime] = None
self._last_video_end: Optional[datetime] = None

def update(self, metadata: VideoMetadata) -> None:
"""
Update the stored metadata.
"""
if metadata.start in self._metadata.keys():
raise ValueError(
f"metadata with start date {metadata.start} already exists."
)
self._metadata[metadata.start] = metadata
self._metadata = dict(sorted(self._metadata.items()))
self._update_start_end_by(metadata)

def _update_start_end_by(self, metadata: VideoMetadata) -> None:
if (not self._first_video_start) or metadata.start < self._first_video_start:
self._first_video_start = metadata.start
if (not self._last_video_end) or metadata.end > self._last_video_end:
self._last_video_end = metadata.end

def get_metadata_for(self, current: datetime) -> Optional[VideoMetadata]:
"""
Find the metadata for the given datetime. If the datetime matches exactly a
start time of a video, the corresponding VideoMetadata is returned. Otherwise,
the metadata of the video containing the datetime will be returned.
"""
if current in self._metadata:
return self._metadata[current]
keys = list(self._metadata.keys())
key = bisect.bisect_left(keys, current) - 1
metadata = self._metadata[keys[key]]
if metadata.start <= current <= metadata.end:
return metadata
return None

@property
def first_video_start(self) -> Optional[datetime]:
return self._first_video_start

@property
def last_video_end(self) -> Optional[datetime]:
return self._last_video_end

def to_dict(self) -> dict:
return {
key.timestamp(): metadata.to_dict()
for key, metadata in self._metadata.items()
}


class TracksMetadata(TrackListObserver):
"""Contains relevant information on the currently loaded tracks.

Expand Down
4 changes: 3 additions & 1 deletion OTAnalytics/application/use_cases/load_track_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ def load(self, file: Path) -> None:
"""
parse_result = self._track_parser.parse(file)
track_ids = list(parse_result.tracks.track_ids)
track_ids, videos = self._track_video_parser.parse(file, track_ids)
track_ids, videos = self._track_video_parser.parse(
file, track_ids, parse_result.video_metadata
)
self._video_repository.add_all(videos)
self._track_to_video_repository.add_all(track_ids, videos)
self._track_repository.add_all(parse_result.tracks)
Expand Down
65 changes: 63 additions & 2 deletions OTAnalytics/domain/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ def get_frame(self, index: int) -> TrackImage:
def get_frame_number_for(self, date: datetime) -> int:
raise NotImplementedError

@abstractmethod
def contains(self, date: datetime) -> bool:
raise NotImplementedError

@abstractmethod
def to_dict(
self,
Expand All @@ -83,6 +87,53 @@ class DifferentDrivesException(Exception):
pass


@dataclass(frozen=True)
class VideoMetadata:
path: str
recorded_start_date: datetime
expected_duration: Optional[timedelta]
recorded_fps: float
actual_fps: Optional[float]
number_of_frames: int

@property
def start(self) -> datetime:
return self.recorded_start_date

@property
def end(self) -> datetime:
return self.start + self.duration

@property
def duration(self) -> timedelta:
if self.expected_duration:
return self.expected_duration
return timedelta(seconds=self.number_of_frames / self.recorded_fps)

@property
def fps(self) -> float:
if self.actual_fps:
return self.actual_fps
return self.recorded_fps

def to_dict(self) -> dict:
return {
"path": self.path,
"recorded_start_date": self.recorded_start_date.timestamp(),
"expected_duration": (
self.expected_duration.total_seconds()
if self.expected_duration
else None
),
"recorded_fps": self.recorded_fps,
"actual_fps": self.actual_fps,
"number_of_frames": self.number_of_frames,
}

def contains(self, date: datetime) -> bool:
return self.start <= date < self.end


@dataclass
class SimpleVideo(Video):
"""Represents a video file.
Expand All @@ -97,12 +148,14 @@ class SimpleVideo(Video):

video_reader: VideoReader
path: Path
_start_date: Optional[datetime]
metadata: Optional[VideoMetadata]
_fps: Optional[int] = None

@property
def start_date(self) -> Optional[datetime]:
return self._start_date
if self.metadata:
return self.metadata.recorded_start_date
return None

@property
def fps(self) -> float:
Expand Down Expand Up @@ -155,6 +208,11 @@ def __build_relative_path(self, relative_to: Path) -> str:
)
return path.relpath(self.path, relative_to)

def contains(self, date: datetime) -> bool:
if self.metadata:
return self.metadata.contains(date)
return False


class VideoListObserver(ABC):
"""
Expand Down Expand Up @@ -250,3 +308,6 @@ def clear(self) -> None:
"""
self._videos.clear()
self._observers.notify([])

def get_by_date(self, date: datetime) -> list[Video]:
return [video for video in self._videos.values() if video.contains(date)]
Loading