Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: compute the time horizon of MQL requests more accurately so they return the same results as MQF requests #290

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 50 additions & 62 deletions slo_generator/backends/cloud_monitoring_mql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@
"""
import logging
import pprint
import re
import typing
import warnings
from collections import OrderedDict
from typing import List, Tuple
from datetime import datetime
from typing import List, Optional, Tuple

from google.api.distribution_pb2 import Distribution
from google.cloud.monitoring_v3 import QueryTimeSeriesRequest
from google.cloud.monitoring_v3.services.query_service import QueryServiceClient
from google.cloud.monitoring_v3.services.query_service.pagers import (
QueryTimeSeriesPager,
)
from google.cloud.monitoring_v3.types import metric_service
from google.cloud.monitoring_v3.types.metric import TimeSeriesData
from google.cloud.monitoring_v3.types.metric import TimeSeries

from slo_generator.constants import NO_DATA

Expand All @@ -56,7 +56,7 @@ def __init__(self, project_id: str, client: QueryServiceClient = None):

def good_bad_ratio(
self,
timestamp: int, # pylint: disable=unused-argument
timestamp: int,
window: int,
slo_config: dict,
) -> Tuple[int, int]:
Expand All @@ -73,22 +73,20 @@ def good_bad_ratio(
"""
measurement: dict = slo_config["spec"]["service_level_indicator"]
filter_good: str = measurement["filter_good"]
filter_bad: typing.Optional[str] = measurement.get("filter_bad")
filter_valid: typing.Optional[str] = measurement.get("filter_valid")
filter_bad: Optional[str] = measurement.get("filter_bad")
filter_valid: Optional[str] = measurement.get("filter_valid")

# Query 'good events' timeseries
good_ts: List[TimeSeriesData] = self.query(query=filter_good, window=window)
good_ts: List[TimeSeries] = self.query(timestamp, window, filter_good)
good_event_count: int = CM.count(good_ts)

# Query 'bad events' timeseries
bad_event_count: int
if filter_bad:
bad_ts: List[TimeSeriesData] = self.query(query=filter_bad, window=window)
bad_ts: List[TimeSeries] = self.query(timestamp, window, filter_bad)
bad_event_count = CM.count(bad_ts)
elif filter_valid:
valid_ts: List[TimeSeriesData] = self.query(
query=filter_valid, window=window
)
valid_ts: List[TimeSeries] = self.query(timestamp, window, filter_valid)
bad_event_count = CM.count(valid_ts) - good_event_count
else:
raise Exception("One of `filter_bad` or `filter_valid` is required.")
Expand Down Expand Up @@ -124,7 +122,7 @@ def distribution_cut(
)

# Query 'valid' events
series = self.query(query=filter_valid, window=window)
series = self.query(timestamp, window, filter_valid)

if not series:
return NO_DATA, NO_DATA # no timeseries
Expand Down Expand Up @@ -193,38 +191,66 @@ def query_sli(
"""
measurement: dict = slo_config["spec"]["service_level_indicator"]
query: str = measurement["query"]
series: List[TimeSeriesData] = self.query(query=query, window=window)
series: List[TimeSeries] = self.query(timestamp, window, query)
sli_value: float = series[0].point_data[0].values[0].double_value
LOGGER.debug(f"SLI value: {sli_value}")
return sli_value

def query(self, query: str, window: int) -> List[TimeSeriesData]:
def query(self, timestamp: float, window: int, query: str) -> List[TimeSeries]:
"""Query timeseries from Cloud Monitoring using MQL.

Args:
query (str): MQL query.
timestamp (float): Current timestamp.
window (int): Window size (in seconds).

query (str): MQL query.
Returns:
list: List of timeseries objects.
"""
# Enrich query to aggregate and reduce the time series over the
# desired window.
formatted_query: str = self._fmt_query(query, window)
request = metric_service.QueryTimeSeriesRequest(
{"name": self.parent, "query": formatted_query}
# Enrich query to aggregate and reduce time series over target window.
query_with_time_horizon_and_period: str = (
self.enrich_query_with_time_horizon_and_period(timestamp, window, query)
)
request = QueryTimeSeriesRequest(
{"name": self.parent, "query": query_with_time_horizon_and_period}
)
# fmt: off
timeseries_pager: QueryTimeSeriesPager = (
self.client.query_time_series(request) # type: ignore[union-attr]
)
# fmt: on
timeseries: list = list(timeseries_pager) # convert pager to flat list
timeseries: List[TimeSeries] = list(timeseries_pager)
LOGGER.debug(pprint.pformat(timeseries))
return timeseries

@staticmethod
def count(timeseries: List[TimeSeriesData]) -> int:
def enrich_query_with_time_horizon_and_period(
timestamp: float,
window: int,
query: str,
) -> str:
"""Enrich MQL query with time period and horizon.
Args:
timestamp (float): UNIX timestamp.
window (int): Query window (in seconds).
query (str): Base query in YAML config.
Returns:
str: Enriched query.
"""
# Python uses floating point numbers to represent time in seconds since the
# epoch, in UTC, with decimal part representing nanoseconds.
# MQL expects dates formatted like "%Y/%m/%d %H:%M:%S" or "%Y/%m/%d-%H:%M:%S".
# Reference: https://cloud.google.com/monitoring/mql/reference#lexical-elements
end_time_str: str = datetime.fromtimestamp(timestamp).strftime(
"%Y/%m/%d %H:%M:%S"
)
query_with_time_horizon_and_period: str = (
query
+ f"| group_by [] | within {window}s, d'{end_time_str}' | every {window}s"
)
return query_with_time_horizon_and_period

@staticmethod
def count(timeseries: List[TimeSeries]) -> int:
"""Count events in time series assuming it was aligned with ALIGN_SUM
and reduced with REDUCE_SUM (default).

Expand All @@ -240,43 +266,5 @@ def count(timeseries: List[TimeSeriesData]) -> int:
LOGGER.debug(exception, exc_info=True)
return NO_DATA # no events in timeseries

@staticmethod
def _fmt_query(query: str, window: int) -> str:
"""Format MQL query:

* If the MQL expression has a `window` placeholder, replace it by the
current window. Otherwise, append it to the expression.

* If the MQL expression has a `every` placeholder, replace it by the
current window. Otherwise, append it to the expression.

* If the MQL expression has a `group_by` placeholder, replace it.
Otherwise, append it to the expression.

Args:
query (str): Original query in YAMLconfig.
window (int): Query window (in seconds).

Returns:
str: Formatted query.
"""
formatted_query: str = query.strip()
if "group_by" in formatted_query:
formatted_query = re.sub(
r"\|\s+group_by\s+\[.*\]\s*", "| group_by [] ", formatted_query
)
else:
formatted_query += "| group_by [] "
for mql_time_interval_keyword in ["within", "every"]:
if mql_time_interval_keyword in formatted_query:
formatted_query = re.sub(
rf"\|\s+{mql_time_interval_keyword}\s+\w+\s*",
f"| {mql_time_interval_keyword} {window}s ",
formatted_query,
)
else:
formatted_query += f"| {mql_time_interval_keyword} {window}s "
return formatted_query.strip()


CM = CloudMonitoringMqlBackend
60 changes: 23 additions & 37 deletions tests/unit/backends/test_cloud_monitoring_mql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,28 @@


class TestCloudMonitoringMqlBackend(unittest.TestCase):
def test_fmt_query(self):
queries = [
""" fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == '${GAE_PROJECT_ID}'
| filter
metric.response_code == 429
|| metric.response_code == 200
| group_by [metric.response_code] | within 1h """,
""" fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == '${GAE_PROJECT_ID}'
| filter
metric.response_code == 429
|| metric.response_code == 200
| group_by [metric.response_code, response_code_class]
| within 1h
| every 1h """,
""" fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == '${GAE_PROJECT_ID}'
| filter
metric.response_code == 429
|| metric.response_code == 200
| group_by [metric.response_code,response_code_class]
| within 1h
| every 1h """,
]
def test_enrich_query_with_time_horizon_and_period(self):
timestamp: float = 1666995015.5144777 # = 2022/10/28 22:10:15.5144777
window: int = 3600 # in seconds
query: str = """fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == 'slo-generator-demo'
| filter
metric.response_code == 429
|| metric.response_code == 200
"""

formatted_query = """fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == '${GAE_PROJECT_ID}'
| filter
metric.response_code == 429
|| metric.response_code == 200
| group_by [] | within 3600s | every 3600s"""
enriched_query = """fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == 'slo-generator-demo'
| filter
metric.response_code == 429
|| metric.response_code == 200
| group_by [] | within 3600s, d'2022/10/28 22:10:15' | every 3600s"""

for query in queries:
assert CloudMonitoringMqlBackend._fmt_query(query, 3600) == formatted_query
assert (
CloudMonitoringMqlBackend.enrich_query_with_time_horizon_and_period(
timestamp, window, query
)
== enriched_query
)