Skip to content

Commit

Permalink
More fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Toker committed Sep 28, 2023
1 parent 2d62175 commit debf572
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
6 changes: 3 additions & 3 deletions integration/test_aggregation_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1094,9 +1094,9 @@ def test_aggregate_and_query_with_different_fixed_windows(setup_teardown_test, p
{
"col1": 10,
"number_of_stuff_avg_1h": math.nan,
"number_of_stuff_max_1h": -math.inf,
"number_of_stuff_min_1h": math.inf,
"number_of_stuff_sum_1h": 0.0,
"number_of_stuff_max_1h": math.nan,
"number_of_stuff_min_1h": math.nan,
"number_of_stuff_sum_1h": math.nan,
"time": datetime.datetime(2020, 7, 22, 2, 15, tzinfo=datetime.timezone.utc),
},
]
Expand Down
21 changes: 17 additions & 4 deletions storey/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,11 @@ def __init__(
windows = {}
for aggregation_metadata in aggregates:
for meta in aggregation_metadata.aggregations:
for aggr, is_hidden in get_all_raw_aggregates_with_hidden([meta]).items():
meta_aggregates = get_all_raw_aggregates_with_hidden([meta]).items()
if not any(ag[0] == "count" for ag in meta_aggregates):
meta_aggregates = list(meta_aggregates)
meta_aggregates.append(("count", True))
for aggr, is_hidden in meta_aggregates:
if (
aggregation_metadata.name,
aggr,
Expand Down Expand Up @@ -629,8 +633,13 @@ def aggregate(self, data, timestamp):
def get_features(self, timestamp):
result = {}
for aggregation_bucket in self.aggregation_buckets.values():
if isinstance(aggregation_bucket, VirtualAggregationBuckets) or aggregation_bucket.explicit_windows:
if isinstance(aggregation_bucket, VirtualAggregationBuckets):
result.update(aggregation_bucket.get_features(timestamp))
elif aggregation_bucket.explicit_windows:
count_features = self.aggregation_buckets[f"{aggregation_bucket.name}_count"].get_features(
timestamp, aggregation_bucket.explicit_windows.windows
)
result.update(aggregation_bucket.get_features(timestamp, count_features=count_features))

return result

Expand Down Expand Up @@ -832,7 +841,7 @@ def get_aggregation_for_aggregation(self):
return "sum"
return self.aggregation

def get_features(self, timestamp, windows=None):
def get_features(self, timestamp, windows=None, count_features=None):
result = {}
if not windows:
if self.explicit_windows:
Expand All @@ -852,7 +861,11 @@ def get_features(self, timestamp, windows=None):
# In case our pre aggregates already have the answer
for win in windows:
result[f"{self.name}_{self.aggregation}_{win[1]}"] = self._current_aggregate_values[win].value

if (
self.aggregation != "count"
and (count_features.get(f"{self.name}_count_{win[1]}", 0) if count_features else 1) == 0
):
result[f"{self.name}_{self.aggregation}_{win[1]}"] = math.nan
return result

def calculate_features(self, timestamp, windows):
Expand Down

0 comments on commit debf572

Please sign in to comment.