Skip to content

Commit

Permalink
ML-2956: Make all aggregations return NaN when there is no data exc…
Browse files Browse the repository at this point in the history
…ept for count (#461)

* ML-2956: aggregations return NaN when there is no data except for COUNT

* More fixes

* Update storey/table.py

Co-authored-by: Gal Topper <gal.topper@gmail.com>

* Deduplicate _hidden_raw_aggregations

---------

Co-authored-by: Alex Toker <alex_toker@mckinsey.com>
Co-authored-by: Gal Topper <gal.topper@gmail.com>
  • Loading branch information
3 people authored Oct 8, 2023
1 parent 30abd95 commit 9d62d65
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 13 deletions.
6 changes: 3 additions & 3 deletions integration/test_aggregation_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1094,9 +1094,9 @@ def test_aggregate_and_query_with_different_fixed_windows(setup_teardown_test, p
{
"col1": 10,
"number_of_stuff_avg_1h": math.nan,
"number_of_stuff_max_1h": -math.inf,
"number_of_stuff_min_1h": math.inf,
"number_of_stuff_sum_1h": 0.0,
"number_of_stuff_max_1h": math.nan,
"number_of_stuff_min_1h": math.nan,
"number_of_stuff_sum_1h": math.nan,
"time": datetime.datetime(2020, 7, 22, 2, 15, tzinfo=datetime.timezone.utc),
},
]
Expand Down
30 changes: 26 additions & 4 deletions storey/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,11 @@ def __init__(
windows = {}
for aggregation_metadata in aggregates:
for meta in aggregation_metadata.aggregations:
for aggr, is_hidden in get_all_raw_aggregates_with_hidden([meta]).items():
meta_aggregates = get_all_raw_aggregates_with_hidden([meta]).items()
if not any(ag[0] == "count" for ag in meta_aggregates):
meta_aggregates = list(meta_aggregates)
meta_aggregates.append(("count", True))
for aggr, is_hidden in meta_aggregates:
if (
aggregation_metadata.name,
aggr,
Expand Down Expand Up @@ -629,8 +633,13 @@ def aggregate(self, data, timestamp):
def get_features(self, timestamp):
result = {}
for aggregation_bucket in self.aggregation_buckets.values():
if isinstance(aggregation_bucket, VirtualAggregationBuckets) or aggregation_bucket.explicit_windows:
if isinstance(aggregation_bucket, VirtualAggregationBuckets):
result.update(aggregation_bucket.get_features(timestamp))
elif aggregation_bucket.explicit_windows:
count_features = self.aggregation_buckets[f"{aggregation_bucket.name}_count"].get_features(
timestamp, aggregation_bucket.explicit_windows.windows
)
result.update(aggregation_bucket.get_features(timestamp, count_features=count_features))

return result

Expand Down Expand Up @@ -832,7 +841,7 @@ def get_aggregation_for_aggregation(self):
return "sum"
return self.aggregation

def get_features(self, timestamp, windows=None):
def get_features(self, timestamp, windows=None, count_features=None):
result = {}
if not windows:
if self.explicit_windows:
Expand All @@ -852,7 +861,15 @@ def get_features(self, timestamp, windows=None):
# In case our pre aggregates already have the answer
for win in windows:
result[f"{self.name}_{self.aggregation}_{win[1]}"] = self._current_aggregate_values[win].value

if (
self.aggregation != "count"
and count_features
and count_features.get(f"{self.name}_count_{win[1]}", 0) == 0
):
value = math.nan
else:
value = self._current_aggregate_values[win].value
result[f"{self.name}_{self.aggregation}_{win[1]}"] = value
return result

def calculate_features(self, timestamp, windows):
Expand Down Expand Up @@ -1408,6 +1425,8 @@ def __init__(
self.name = name
self._explicit_raw_aggregations = explicit_raw_aggregations
self._hidden_raw_aggregations = hidden_raw_aggregations
self._hidden_raw_aggregations.append("count")
self._hidden_raw_aggregations = list(set(self._hidden_raw_aggregations))
self._all_raw_aggregates = self._explicit_raw_aggregations.copy()
for hidden_aggr in self._hidden_raw_aggregations:
if hidden_aggr not in self._all_raw_aggregates:
Expand Down Expand Up @@ -1655,8 +1674,11 @@ def get_features(self, timestamp):
for aggregation_name in self._explicit_raw_aggregations:
for (window_millis, window_str) in self.explicit_windows.windows:
value = self._current_aggregate_values[(aggregation_name, window_millis)].value
count_value = self._current_aggregate_values[("count", window_millis)].value
if value == math.inf or value == -math.inf:
value = math.nan
if count_value == 0 and aggregation_name != "count":
value = math.nan
result[f"{self.name}_{aggregation_name}_{window_str}"] = value

self.augment_virtual_features(result)
Expand Down
12 changes: 6 additions & 6 deletions tests/test_aggregate_by_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,9 +492,9 @@ def test_sliding_window_sparse_data():
"number_of_stuff2_min_1h": math.nan,
"number_of_stuff2_min_24h": math.nan,
"number_of_stuff2_min_2h": math.nan,
"number_of_stuff2_sum_1h": 0,
"number_of_stuff2_sum_24h": 0,
"number_of_stuff2_sum_2h": 0,
"number_of_stuff2_sum_1h": math.nan,
"number_of_stuff2_sum_24h": math.nan,
"number_of_stuff2_sum_2h": math.nan,
"time": datetime(2020, 7, 21, 21, 40, tzinfo=timezone.utc),
},
{
Expand Down Expand Up @@ -1096,9 +1096,9 @@ def test_sliding_window_sparse_data_uneven_feature_occurrence():
"number_of_stuff2_min_1h": math.nan,
"number_of_stuff2_min_24h": math.nan,
"number_of_stuff2_min_2h": math.nan,
"number_of_stuff2_sum_1h": 0.0,
"number_of_stuff2_sum_24h": 0.0,
"number_of_stuff2_sum_2h": 0.0,
"number_of_stuff2_sum_1h": math.nan,
"number_of_stuff2_sum_24h": math.nan,
"number_of_stuff2_sum_2h": math.nan,
"time": datetime(2020, 7, 21, 21, 40, tzinfo=timezone.utc),
},
{
Expand Down

0 comments on commit 9d62d65

Please sign in to comment.