Skip to content

Commit

Permalink
fix: compute the time horizon of MQL requests more accurately so they…
Browse files Browse the repository at this point in the history
… return the same results as MQF requests (#290)
  • Loading branch information
lvaylet authored Nov 2, 2022
1 parent c6d769f commit 41b814b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 99 deletions.
112 changes: 50 additions & 62 deletions slo_generator/backends/cloud_monitoring_mql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@
"""
import logging
import pprint
import re
import typing
import warnings
from collections import OrderedDict
from typing import List, Tuple
from datetime import datetime
from typing import List, Optional, Tuple

from google.api.distribution_pb2 import Distribution
from google.cloud.monitoring_v3 import QueryTimeSeriesRequest
from google.cloud.monitoring_v3.services.query_service import QueryServiceClient
from google.cloud.monitoring_v3.services.query_service.pagers import (
QueryTimeSeriesPager,
)
from google.cloud.monitoring_v3.types import metric_service
from google.cloud.monitoring_v3.types.metric import TimeSeriesData
from google.cloud.monitoring_v3.types.metric import TimeSeries

from slo_generator.constants import NO_DATA

Expand All @@ -56,7 +56,7 @@ def __init__(self, project_id: str, client: QueryServiceClient = None):

def good_bad_ratio(
self,
timestamp: int, # pylint: disable=unused-argument
timestamp: int,
window: int,
slo_config: dict,
) -> Tuple[int, int]:
Expand All @@ -73,22 +73,20 @@ def good_bad_ratio(
"""
measurement: dict = slo_config["spec"]["service_level_indicator"]
filter_good: str = measurement["filter_good"]
filter_bad: typing.Optional[str] = measurement.get("filter_bad")
filter_valid: typing.Optional[str] = measurement.get("filter_valid")
filter_bad: Optional[str] = measurement.get("filter_bad")
filter_valid: Optional[str] = measurement.get("filter_valid")

# Query 'good events' timeseries
good_ts: List[TimeSeriesData] = self.query(query=filter_good, window=window)
good_ts: List[TimeSeries] = self.query(timestamp, window, filter_good)
good_event_count: int = CM.count(good_ts)

# Query 'bad events' timeseries
bad_event_count: int
if filter_bad:
bad_ts: List[TimeSeriesData] = self.query(query=filter_bad, window=window)
bad_ts: List[TimeSeries] = self.query(timestamp, window, filter_bad)
bad_event_count = CM.count(bad_ts)
elif filter_valid:
valid_ts: List[TimeSeriesData] = self.query(
query=filter_valid, window=window
)
valid_ts: List[TimeSeries] = self.query(timestamp, window, filter_valid)
bad_event_count = CM.count(valid_ts) - good_event_count
else:
raise Exception("One of `filter_bad` or `filter_valid` is required.")
Expand Down Expand Up @@ -124,7 +122,7 @@ def distribution_cut(
)

# Query 'valid' events
series = self.query(query=filter_valid, window=window)
series = self.query(timestamp, window, filter_valid)

if not series:
return NO_DATA, NO_DATA # no timeseries
Expand Down Expand Up @@ -193,38 +191,66 @@ def query_sli(
"""
measurement: dict = slo_config["spec"]["service_level_indicator"]
query: str = measurement["query"]
series: List[TimeSeriesData] = self.query(query=query, window=window)
series: List[TimeSeries] = self.query(timestamp, window, query)
sli_value: float = series[0].point_data[0].values[0].double_value
LOGGER.debug(f"SLI value: {sli_value}")
return sli_value

def query(self, query: str, window: int) -> List[TimeSeriesData]:
def query(self, timestamp: float, window: int, query: str) -> List[TimeSeries]:
"""Query timeseries from Cloud Monitoring using MQL.
Args:
query (str): MQL query.
timestamp (float): Current timestamp.
window (int): Window size (in seconds).
query (str): MQL query.
Returns:
list: List of timeseries objects.
"""
# Enrich query to aggregate and reduce the time series over the
# desired window.
formatted_query: str = self._fmt_query(query, window)
request = metric_service.QueryTimeSeriesRequest(
{"name": self.parent, "query": formatted_query}
# Enrich query to aggregate and reduce time series over target window.
query_with_time_horizon_and_period: str = (
self.enrich_query_with_time_horizon_and_period(timestamp, window, query)
)
request = QueryTimeSeriesRequest(
{"name": self.parent, "query": query_with_time_horizon_and_period}
)
# fmt: off
timeseries_pager: QueryTimeSeriesPager = (
self.client.query_time_series(request) # type: ignore[union-attr]
)
# fmt: on
timeseries: list = list(timeseries_pager) # convert pager to flat list
timeseries: List[TimeSeries] = list(timeseries_pager)
LOGGER.debug(pprint.pformat(timeseries))
return timeseries

@staticmethod
def count(timeseries: List[TimeSeriesData]) -> int:
def enrich_query_with_time_horizon_and_period(
timestamp: float,
window: int,
query: str,
) -> str:
"""Enrich MQL query with time period and horizon.
Args:
timestamp (float): UNIX timestamp.
window (int): Query window (in seconds).
query (str): Base query in YAML config.
Returns:
str: Enriched query.
"""
# Python uses floating point numbers to represent time in seconds since the
# epoch, in UTC, with decimal part representing nanoseconds.
# MQL expects dates formatted like "%Y/%m/%d %H:%M:%S" or "%Y/%m/%d-%H:%M:%S".
# Reference: https://cloud.google.com/monitoring/mql/reference#lexical-elements
end_time_str: str = datetime.fromtimestamp(timestamp).strftime(
"%Y/%m/%d %H:%M:%S"
)
query_with_time_horizon_and_period: str = (
query
+ f"| group_by [] | within {window}s, d'{end_time_str}' | every {window}s"
)
return query_with_time_horizon_and_period

@staticmethod
def count(timeseries: List[TimeSeries]) -> int:
"""Count events in time series assuming it was aligned with ALIGN_SUM
and reduced with REDUCE_SUM (default).
Expand All @@ -240,43 +266,5 @@ def count(timeseries: List[TimeSeriesData]) -> int:
LOGGER.debug(exception, exc_info=True)
return NO_DATA # no events in timeseries

@staticmethod
def _fmt_query(query: str, window: int) -> str:
"""Format MQL query:
* If the MQL expression has a `window` placeholder, replace it by the
current window. Otherwise, append it to the expression.
* If the MQL expression has a `every` placeholder, replace it by the
current window. Otherwise, append it to the expression.
* If the MQL expression has a `group_by` placeholder, replace it.
Otherwise, append it to the expression.
Args:
query (str): Original query in YAMLconfig.
window (int): Query window (in seconds).
Returns:
str: Formatted query.
"""
formatted_query: str = query.strip()
if "group_by" in formatted_query:
formatted_query = re.sub(
r"\|\s+group_by\s+\[.*\]\s*", "| group_by [] ", formatted_query
)
else:
formatted_query += "| group_by [] "
for mql_time_interval_keyword in ["within", "every"]:
if mql_time_interval_keyword in formatted_query:
formatted_query = re.sub(
rf"\|\s+{mql_time_interval_keyword}\s+\w+\s*",
f"| {mql_time_interval_keyword} {window}s ",
formatted_query,
)
else:
formatted_query += f"| {mql_time_interval_keyword} {window}s "
return formatted_query.strip()


CM = CloudMonitoringMqlBackend
60 changes: 23 additions & 37 deletions tests/unit/backends/test_cloud_monitoring_mql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,28 @@


class TestCloudMonitoringMqlBackend(unittest.TestCase):
def test_fmt_query(self):
queries = [
""" fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == '${GAE_PROJECT_ID}'
| filter
metric.response_code == 429
|| metric.response_code == 200
| group_by [metric.response_code] | within 1h """,
""" fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == '${GAE_PROJECT_ID}'
| filter
metric.response_code == 429
|| metric.response_code == 200
| group_by [metric.response_code, response_code_class]
| within 1h
| every 1h """,
""" fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == '${GAE_PROJECT_ID}'
| filter
metric.response_code == 429
|| metric.response_code == 200
| group_by [metric.response_code,response_code_class]
| within 1h
| every 1h """,
]
def test_enrich_query_with_time_horizon_and_period(self):
timestamp: float = 1666995015.5144777 # = 2022/10/28 22:10:15.5144777
window: int = 3600 # in seconds
query: str = """fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == 'slo-generator-demo'
| filter
metric.response_code == 429
|| metric.response_code == 200
"""

formatted_query = """fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == '${GAE_PROJECT_ID}'
| filter
metric.response_code == 429
|| metric.response_code == 200
| group_by [] | within 3600s | every 3600s"""
enriched_query = """fetch gae_app
| metric 'appengine.googleapis.com/http/server/response_count'
| filter resource.project_id == 'slo-generator-demo'
| filter
metric.response_code == 429
|| metric.response_code == 200
| group_by [] | within 3600s, d'2022/10/28 22:10:15' | every 3600s"""

for query in queries:
assert CloudMonitoringMqlBackend._fmt_query(query, 3600) == formatted_query
assert (
CloudMonitoringMqlBackend.enrich_query_with_time_horizon_and_period(
timestamp, window, query
)
== enriched_query
)

0 comments on commit 41b814b

Please sign in to comment.