Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance drop due to internal datastructure changes of PandasTrackDataset #475

Merged
merged 7 commits into from
Mar 6, 2024
4 changes: 2 additions & 2 deletions OTAnalytics/domain/track_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,11 @@ def add_all(self, tracks: Iterable[Track]) -> "TrackGeometryDataset":
raise NotImplementedError

@abstractmethod
def remove(self, ids: Iterable[TrackId]) -> "TrackGeometryDataset":
def remove(self, ids: Sequence[str]) -> "TrackGeometryDataset":
"""Remove track geometries with given ids from dataset.

Args:
ids (Iterable[TrackId]): the track geometries to remove.
ids (Sequence[str]): the track geometries to remove.

Returns:
TrackGeometryDataset: the dataset with tracks removed.
Expand Down
7 changes: 4 additions & 3 deletions OTAnalytics/plugin_datastore/python_track_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,9 @@ def empty(self) -> bool:
def __init__(
self,
values: Optional[dict[TrackId, Track]] = None,
geometry_datasets: dict[RelativeOffsetCoordinate, TrackGeometryDataset]
| None = None,
geometry_datasets: (
dict[RelativeOffsetCoordinate, TrackGeometryDataset] | None
) = None,
calculator: TrackClassificationCalculator = ByMaxConfidence(),
track_geometry_factory: TRACK_GEOMETRY_FACTORY = (
PygeosTrackGeometryDataset.from_track_dataset
Expand Down Expand Up @@ -449,7 +450,7 @@ def _remove_from_geometry_datasets(
) -> dict[RelativeOffsetCoordinate, TrackGeometryDataset]:
updated = {}
for offset, geometry_dataset in self._geometry_datasets.items():
updated[offset] = geometry_dataset.remove(track_ids)
updated[offset] = geometry_dataset.remove([_id.id for _id in track_ids])
return updated

def clear(self) -> "PythonTrackDataset":
Expand Down
24 changes: 8 additions & 16 deletions OTAnalytics/plugin_datastore/track_geometry_store/pygeos_store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from bisect import bisect
from collections import defaultdict
from itertools import chain
from typing import Any, Iterable, Literal
from typing import Any, Iterable, Literal, Sequence

from pandas import DataFrame, concat
from pygeos import (
Expand All @@ -28,11 +28,7 @@
TrackDataset,
TrackGeometryDataset,
)
from OTAnalytics.plugin_datastore.track_store import (
LEVEL_CLASSIFICATION,
LEVEL_TRACK_ID,
PandasTrackDataset,
)
from OTAnalytics.plugin_datastore.track_store import LEVEL_TRACK_ID, PandasTrackDataset

TRACK_ID = "track_id"
GEOMETRY = "geom"
Expand Down Expand Up @@ -188,9 +184,7 @@ def __create_entries_from_dataframe(
track_size_mask = track_dataset._dataset.groupby(
level=LEVEL_TRACK_ID
).transform("size")
filtered_tracks = track_dataset._dataset[track_size_mask > 1].reset_index(
level=LEVEL_CLASSIFICATION, drop=True
)
filtered_tracks = track_dataset._dataset[track_size_mask > 1]

if offset == BASE_GEOMETRY:
new_x = filtered_tracks[track.X]
Expand Down Expand Up @@ -232,10 +226,8 @@ def add_all(self, tracks: Iterable[Track]) -> TrackGeometryDataset:

return PygeosTrackGeometryDataset(self._offset, new_dataset)

def remove(self, ids: Iterable[TrackId]) -> TrackGeometryDataset:
updated = self._dataset.drop(
index=[track_id.id for track_id in ids], errors="ignore"
)
def remove(self, ids: Sequence[str]) -> TrackGeometryDataset:
updated = self._dataset.drop(index=ids, errors="ignore")
return PygeosTrackGeometryDataset(self._offset, updated)

def get_for(self, track_ids: Iterable[str]) -> "TrackGeometryDataset":
Expand Down Expand Up @@ -315,9 +307,9 @@ def _next_event(
def contained_by_sections(
self, sections: list[Section]
) -> dict[TrackId, list[tuple[SectionId, list[bool]]]]:
contains_result: dict[
TrackId, list[tuple[SectionId, list[bool]]]
] = defaultdict(list)
contains_result: dict[TrackId, list[tuple[SectionId, list[bool]]]] = (
defaultdict(list)
)
for _section in sections:
section_geom = area_section_to_pygeos(_section)

Expand Down
69 changes: 29 additions & 40 deletions OTAnalytics/plugin_datastore/track_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
class PandasDetection(Detection):
def __init__(self, track_id: str, data: Series):
self._track_id = track_id
self._occurrence: Any = data.name[1] # data.name is tuple
self._occurrence: Any = data.name # data.name is tuple
self._data = data

@property
Expand Down Expand Up @@ -118,7 +118,7 @@ def id(self) -> TrackId:

@property
def classification(self) -> str:
return self._data.index.get_level_values(LEVEL_CLASSIFICATION).values[0]
return self._data[track.TRACK_CLASSIFICATION].iloc[0]

@property
def detections(self) -> list[Detection]:
Expand Down Expand Up @@ -178,7 +178,6 @@ def calculate(self, detections: DataFrame) -> DataFrame:

DEFAULT_CLASSIFICATOR = PandasByMaxConfidence()
INDEX_NAMES = [track.TRACK_ID, track.OCCURRENCE]
LEVEL_CLASSIFICATION = track.TRACK_CLASSIFICATION
LEVEL_TRACK_ID = track.TRACK_ID
LEVEL_OCCURRENCE = track.OCCURRENCE
CUT_INDICES = "CUT_INDICES"
Expand Down Expand Up @@ -238,7 +237,7 @@ def last_occurrence(self) -> datetime | None:
def classifications(self) -> frozenset[str]:
if not len(self):
return frozenset()
return frozenset(self._dataset.index.unique(LEVEL_CLASSIFICATION))
return frozenset(self._dataset[track.TRACK_CLASSIFICATION].unique())

@property
def empty(self) -> bool:
Expand All @@ -248,8 +247,9 @@ def __init__(
self,
track_geometry_factory: TRACK_GEOMETRY_FACTORY,
dataset: DataFrame | None = None,
geometry_datasets: dict[RelativeOffsetCoordinate, TrackGeometryDataset]
| None = None,
geometry_datasets: (
dict[RelativeOffsetCoordinate, TrackGeometryDataset] | None
) = None,
calculator: PandasTrackClassificationCalculator = DEFAULT_CLASSIFICATOR,
):
if dataset is not None:
Expand Down Expand Up @@ -292,37 +292,29 @@ def from_list(
def from_dataframe(
tracks: DataFrame,
track_geometry_factory: TRACK_GEOMETRY_FACTORY,
geometry_dataset: dict[RelativeOffsetCoordinate, TrackGeometryDataset]
| None = None,
geometry_dataset: (
dict[RelativeOffsetCoordinate, TrackGeometryDataset] | None
) = None,
calculator: PandasTrackClassificationCalculator = DEFAULT_CLASSIFICATOR,
) -> "PandasTrackDataset":
if tracks.empty:
return PandasTrackDataset(track_geometry_factory)

classified_tracks = _assign_track_classification(tracks, calculator)
newly_indexed_tracks = classified_tracks.set_index(
track.TRACK_CLASSIFICATION, append=True
).reorder_levels([LEVEL_CLASSIFICATION, LEVEL_TRACK_ID, LEVEL_OCCURRENCE])

return PandasTrackDataset(
track_geometry_factory,
newly_indexed_tracks,
classified_tracks,
geometry_datasets=geometry_dataset,
)

def add_all(self, other: Iterable[Track]) -> "PandasTrackDataset":
new_tracks = self.__get_tracks(other)
if new_tracks.empty:
return self
if LEVEL_CLASSIFICATION in new_tracks.index.names:
new_tracks = new_tracks.reset_index(level=LEVEL_CLASSIFICATION, drop=True)
if self._dataset.empty:
return PandasTrackDataset.from_dataframe(
new_tracks, self.track_geometry_factory, calculator=self.calculator
)
updated_dataset = pandas.concat(
[self._dataset.reset_index(LEVEL_CLASSIFICATION, drop=True), new_tracks]
).sort_index()
updated_dataset = pandas.concat([self._dataset, new_tracks]).sort_index()
new_track_ids = new_tracks.index.unique(LEVEL_TRACK_ID)
new_dataset = updated_dataset.loc[new_track_ids]
updated_geometry_dataset = self._add_to_geometry_dataset(
Expand Down Expand Up @@ -362,26 +354,24 @@ def clear(self) -> "PandasTrackDataset":
return PandasTrackDataset(self.track_geometry_factory)

def remove(self, track_id: TrackId) -> "PandasTrackDataset":
remaining_tracks = self._dataset.drop(
track_id.id, level=LEVEL_TRACK_ID, errors="ignore"
)
updated_geometry_datasets = self._remove_from_geometry_dataset({track_id})
remaining_tracks = self._dataset.drop(track_id.id, errors="ignore")
updated_geometry_datasets = self._remove_from_geometry_dataset([track_id.id])
return PandasTrackDataset.from_dataframe(
remaining_tracks, self.track_geometry_factory, updated_geometry_datasets
)

def remove_multiple(self, track_ids: set[TrackId]) -> "PandasTrackDataset":
track_ids_primitive = [track_id.id for track_id in track_ids]
remaining_tracks = self._dataset.drop(
track_ids_primitive, level=LEVEL_TRACK_ID, errors="ignore"
remaining_tracks = self._dataset.drop(track_ids_primitive, errors="ignore")
updated_geometry_datasets = self._remove_from_geometry_dataset(
track_ids_primitive
)
updated_geometry_datasets = self._remove_from_geometry_dataset(track_ids)
return PandasTrackDataset.from_dataframe(
remaining_tracks, self.track_geometry_factory, updated_geometry_datasets
)

def _remove_from_geometry_dataset(
self, track_ids: Iterable[TrackId]
self, track_ids: Sequence[str]
) -> dict[RelativeOffsetCoordinate, TrackGeometryDataset]:
updated_dataset = {}
for offset, geometry_dataset in self._geometry_datasets.items():
Expand All @@ -399,7 +389,7 @@ def as_list(self) -> list[Track]:
return [self.__create_track_flyweight(current) for current in track_ids]

def __create_track_flyweight(self, track_id: str) -> Track:
track_frame = self._dataset.loc[:, track_id, :]
track_frame = self._dataset.loc[track_id, :]
return PandasTrack(track_id, track_frame)

def as_dataframe(self) -> DataFrame:
Expand All @@ -411,7 +401,7 @@ def split(self, batches: int) -> Sequence["PandasTrackDataset"]:

new_batches = []
for batch_ids in batched(self.get_track_ids_as_string(), batch_size):
batch_dataset = self._dataset.loc[:, list(batch_ids), :]
batch_dataset = self._dataset.loc[list(batch_ids), :]
batch_geometries = self._get_geometries_for(batch_ids)
new_batches.append(
PandasTrackDataset.from_dataframe(
Expand Down Expand Up @@ -446,7 +436,7 @@ def filter_by_min_detection_length(self, length: int) -> "PandasTrackDataset":
filtered_ids = detection_counts_per_track[
detection_counts_per_track >= length
].index
filtered_dataset = self._dataset.loc[:, filtered_ids, :]
filtered_dataset = self._dataset.loc[filtered_ids]
return PandasTrackDataset(
self.track_geometry_factory, filtered_dataset, calculator=self.calculator
)
Expand Down Expand Up @@ -505,9 +495,7 @@ def get_first_segments(self) -> TrackSegmentDataset:
return PandasTrackSegmentDataset(first_segments)

def __create_segments(self) -> DataFrame:
data: DataFrame = self._dataset.reset_index(
level=[LEVEL_CLASSIFICATION, LEVEL_OCCURRENCE]
)
data: DataFrame = self._dataset.reset_index(level=[LEVEL_OCCURRENCE])
first_detections = data.groupby(level=LEVEL_TRACK_ID, group_keys=True)
data[START_X] = first_detections[track.X].shift(1)
data[START_Y] = first_detections[track.Y].shift(1)
Expand Down Expand Up @@ -571,9 +559,9 @@ def cut_with_section(
for track_id, intersection_points in intersection_points.items()
}
tracks_to_cut = list(cut_indices.keys())
cut_tracks_df = self._dataset.loc[:, tracks_to_cut, :].copy()
cut_tracks_df = self._dataset.loc[tracks_to_cut].copy()
index_as_df = cut_tracks_df.index.to_frame(
name=[LEVEL_CLASSIFICATION, LEVEL_TRACK_ID, LEVEL_OCCURRENCE]
name=[track.TRACK_ID, track.OCCURRENCE]
)
index_as_df["cumcount"] = index_as_df.groupby(level=LEVEL_TRACK_ID).transform(
"cumcount"
Expand All @@ -582,7 +570,7 @@ def cut_with_section(
lambda row: self._create_cut_track_id(row, cut_indices), axis=1
)
cut_tracks_df.index = MultiIndex.from_frame(
index_as_df[[LEVEL_CLASSIFICATION, LEVEL_TRACK_ID, LEVEL_OCCURRENCE]]
index_as_df[[track.TRACK_ID, track.OCCURRENCE]]
)
return PandasTrackDataset(self.track_geometry_factory, cut_tracks_df), set(
intersection_points.keys()
Expand Down Expand Up @@ -662,11 +650,12 @@ def _get_dataset_with_classes(self, classes: list[str]) -> PandasTrackDataset:
if self._other.empty:
return self._other
dataset = self._other.as_dataframe()
filtered_df = dataset.loc[classes]
mask = dataset[track.TRACK_CLASSIFICATION].isin(classes)
filtered_df = dataset[mask]
tracks_to_keep = filtered_df.index.get_level_values(LEVEL_TRACK_ID).unique()
tracks_to_remove = tracks_to_keep.symmetric_difference(
self._other.get_track_ids_as_string()
).map(lambda _id: TrackId(_id))
)
updated_geometry_datasets = self._other._remove_from_geometry_dataset(
tracks_to_remove
)
Expand Down Expand Up @@ -721,8 +710,8 @@ def _assign_track_classification(


def _drop_track_classification(data: DataFrame) -> DataFrame:
if LEVEL_CLASSIFICATION in data.index.names:
return data.reset_index(level=LEVEL_CLASSIFICATION, drop=True)
if track.TRACK_CLASSIFICATION in data.columns:
return data.drop(columns=[track.TRACK_CLASSIFICATION])
return data


Expand Down
14 changes: 3 additions & 11 deletions OTAnalytics/plugin_filter/dataframe_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ def apply(self, iterable: Iterable[DataFrame]) -> Iterable[DataFrame]:
return iterable


INDEX_LEVEL_CLASSIFICATION = 0
INDEX_LEVEL_OCCURRENCE = 2


class DataFrameStartsAtOrAfterDate(DataFramePredicate):
"""Checks if the DataFrame rows start at or after date.

Expand All @@ -93,7 +89,7 @@ def test(self, to_test: DataFrame) -> DataFrame:
"must be index of DataFrame for filtering to work."
)
return to_test[
to_test.index.get_level_values(INDEX_LEVEL_OCCURRENCE) >= self._start_date
to_test.index.get_level_values(track.OCCURRENCE) >= self._start_date
]


Expand All @@ -120,7 +116,7 @@ def test(self, to_test: DataFrame) -> DataFrame:
"must be index of DataFrame for filtering to work."
)
return to_test[
to_test.index.get_level_values(INDEX_LEVEL_OCCURRENCE) <= self._end_date
to_test.index.get_level_values(track.OCCURRENCE) <= self._end_date
]


Expand All @@ -141,11 +137,7 @@ def __init__(
self._classifications = classifications

def test(self, to_test: DataFrame) -> DataFrame:
return to_test.loc[
to_test.index.get_level_values(INDEX_LEVEL_CLASSIFICATION).intersection(
self._classifications, sort=True
)
]
return to_test.loc[to_test[self._column_name].isin(self._classifications)]


class DataFrameFilterBuilder(FilterBuilder[DataFrame, DataFrame]):
Expand Down
Loading
Loading