diff --git a/slo_generator/backends/cloud_monitoring_mql.py b/slo_generator/backends/cloud_monitoring_mql.py index 56ea49ba..1e934f98 100644 --- a/slo_generator/backends/cloud_monitoring_mql.py +++ b/slo_generator/backends/cloud_monitoring_mql.py @@ -17,19 +17,19 @@ """ import logging import pprint -import re import typing import warnings from collections import OrderedDict -from typing import List, Tuple +from datetime import datetime +from typing import List, Optional, Tuple from google.api.distribution_pb2 import Distribution +from google.cloud.monitoring_v3 import QueryTimeSeriesRequest from google.cloud.monitoring_v3.services.query_service import QueryServiceClient from google.cloud.monitoring_v3.services.query_service.pagers import ( QueryTimeSeriesPager, ) -from google.cloud.monitoring_v3.types import metric_service -from google.cloud.monitoring_v3.types.metric import TimeSeriesData +from google.cloud.monitoring_v3.types.metric import TimeSeries from slo_generator.constants import NO_DATA @@ -56,7 +56,7 @@ def __init__(self, project_id: str, client: QueryServiceClient = None): def good_bad_ratio( self, - timestamp: int, # pylint: disable=unused-argument + timestamp: int, window: int, slo_config: dict, ) -> Tuple[int, int]: @@ -73,22 +73,20 @@ def good_bad_ratio( """ measurement: dict = slo_config["spec"]["service_level_indicator"] filter_good: str = measurement["filter_good"] - filter_bad: typing.Optional[str] = measurement.get("filter_bad") - filter_valid: typing.Optional[str] = measurement.get("filter_valid") + filter_bad: Optional[str] = measurement.get("filter_bad") + filter_valid: Optional[str] = measurement.get("filter_valid") # Query 'good events' timeseries - good_ts: List[TimeSeriesData] = self.query(query=filter_good, window=window) + good_ts: List[TimeSeries] = self.query(timestamp, window, filter_good) good_event_count: int = CM.count(good_ts) # Query 'bad events' timeseries bad_event_count: int if filter_bad: - bad_ts: List[TimeSeriesData] = self.query(query=filter_bad, window=window) + bad_ts: List[TimeSeries] = self.query(timestamp, window, filter_bad) bad_event_count = CM.count(bad_ts) elif filter_valid: - valid_ts: List[TimeSeriesData] = self.query( - query=filter_valid, window=window - ) + valid_ts: List[TimeSeries] = self.query(timestamp, window, filter_valid) bad_event_count = CM.count(valid_ts) - good_event_count else: raise Exception("One of `filter_bad` or `filter_valid` is required.") @@ -124,7 +122,7 @@ def distribution_cut( ) # Query 'valid' events - series = self.query(query=filter_valid, window=window) + series = self.query(timestamp, window, filter_valid) if not series: return NO_DATA, NO_DATA # no timeseries @@ -193,38 +191,66 @@ def query_sli( """ measurement: dict = slo_config["spec"]["service_level_indicator"] query: str = measurement["query"] - series: List[TimeSeriesData] = self.query(query=query, window=window) + series: List[TimeSeries] = self.query(timestamp, window, query) sli_value: float = series[0].point_data[0].values[0].double_value LOGGER.debug(f"SLI value: {sli_value}") return sli_value - def query(self, query: str, window: int) -> List[TimeSeriesData]: + def query(self, timestamp: float, window: int, query: str) -> List[TimeSeries]: """Query timeseries from Cloud Monitoring using MQL. Args: - query (str): MQL query. + timestamp (float): Current timestamp. window (int): Window size (in seconds). - + query (str): MQL query. Returns: list: List of timeseries objects. """ - # Enrich query to aggregate and reduce the time series over the - # desired window. - formatted_query: str = self._fmt_query(query, window) - request = metric_service.QueryTimeSeriesRequest( - {"name": self.parent, "query": formatted_query} + # Enrich query to aggregate and reduce time series over target window. + query_with_time_horizon_and_period: str = ( + self.enrich_query_with_time_horizon_and_period(timestamp, window, query) + ) + request = QueryTimeSeriesRequest( + {"name": self.parent, "query": query_with_time_horizon_and_period} ) # fmt: off timeseries_pager: QueryTimeSeriesPager = ( self.client.query_time_series(request) # type: ignore[union-attr] ) # fmt: on - timeseries: list = list(timeseries_pager) # convert pager to flat list + timeseries: List[TimeSeries] = list(timeseries_pager) LOGGER.debug(pprint.pformat(timeseries)) return timeseries @staticmethod - def count(timeseries: List[TimeSeriesData]) -> int: + def enrich_query_with_time_horizon_and_period( + timestamp: float, + window: int, + query: str, + ) -> str: + """Enrich MQL query with time period and horizon. + Args: + timestamp (float): UNIX timestamp. + window (int): Query window (in seconds). + query (str): Base query in YAML config. + Returns: + str: Enriched query. + """ + # Python uses floating point numbers to represent time in seconds since the + # epoch, in UTC, with decimal part representing nanoseconds. + # MQL expects dates formatted like "%Y/%m/%d %H:%M:%S" or "%Y/%m/%d-%H:%M:%S". + # Reference: https://cloud.google.com/monitoring/mql/reference#lexical-elements + end_time_str: str = datetime.fromtimestamp(timestamp).strftime( + "%Y/%m/%d %H:%M:%S" + ) + query_with_time_horizon_and_period: str = ( + query + + f"| group_by [] | within {window}s, d'{end_time_str}' | every {window}s" + ) + return query_with_time_horizon_and_period + + @staticmethod + def count(timeseries: List[TimeSeries]) -> int: """Count events in time series assuming it was aligned with ALIGN_SUM and reduced with REDUCE_SUM (default). @@ -240,43 +266,5 @@ def count(timeseries: List[TimeSeriesData]) -> int: LOGGER.debug(exception, exc_info=True) return NO_DATA # no events in timeseries - @staticmethod - def _fmt_query(query: str, window: int) -> str: - """Format MQL query: - - * If the MQL expression has a `window` placeholder, replace it by the - current window. Otherwise, append it to the expression. - - * If the MQL expression has a `every` placeholder, replace it by the - current window. Otherwise, append it to the expression. - - * If the MQL expression has a `group_by` placeholder, replace it. - Otherwise, append it to the expression. - - Args: - query (str): Original query in YAMLconfig. - window (int): Query window (in seconds). - - Returns: - str: Formatted query. - """ - formatted_query: str = query.strip() - if "group_by" in formatted_query: - formatted_query = re.sub( - r"\|\s+group_by\s+\[.*\]\s*", "| group_by [] ", formatted_query - ) - else: - formatted_query += "| group_by [] " - for mql_time_interval_keyword in ["within", "every"]: - if mql_time_interval_keyword in formatted_query: - formatted_query = re.sub( - rf"\|\s+{mql_time_interval_keyword}\s+\w+\s*", - f"| {mql_time_interval_keyword} {window}s ", - formatted_query, - ) - else: - formatted_query += f"| {mql_time_interval_keyword} {window}s " - return formatted_query.strip() - CM = CloudMonitoringMqlBackend diff --git a/tests/unit/backends/test_cloud_monitoring_mql.py b/tests/unit/backends/test_cloud_monitoring_mql.py index 9e5e9d2e..addb6ef3 100644 --- a/tests/unit/backends/test_cloud_monitoring_mql.py +++ b/tests/unit/backends/test_cloud_monitoring_mql.py @@ -20,42 +20,28 @@ class TestCloudMonitoringMqlBackend(unittest.TestCase): - def test_fmt_query(self): - queries = [ - """ fetch gae_app - | metric 'appengine.googleapis.com/http/server/response_count' - | filter resource.project_id == '${GAE_PROJECT_ID}' - | filter - metric.response_code == 429 - || metric.response_code == 200 - | group_by [metric.response_code] | within 1h """, - """ fetch gae_app - | metric 'appengine.googleapis.com/http/server/response_count' - | filter resource.project_id == '${GAE_PROJECT_ID}' - | filter - metric.response_code == 429 - || metric.response_code == 200 - | group_by [metric.response_code, response_code_class] - | within 1h - | every 1h """, - """ fetch gae_app - | metric 'appengine.googleapis.com/http/server/response_count' - | filter resource.project_id == '${GAE_PROJECT_ID}' - | filter - metric.response_code == 429 - || metric.response_code == 200 - | group_by [metric.response_code,response_code_class] - | within 1h - | every 1h """, - ] + def test_enrich_query_with_time_horizon_and_period(self): + timestamp: float = 1666995015.5144777 # = 2022/10/28 22:10:15.5144777 + window: int = 3600 # in seconds + query: str = """fetch gae_app +| metric 'appengine.googleapis.com/http/server/response_count' +| filter resource.project_id == 'slo-generator-demo' +| filter + metric.response_code == 429 + || metric.response_code == 200 +""" - formatted_query = """fetch gae_app - | metric 'appengine.googleapis.com/http/server/response_count' - | filter resource.project_id == '${GAE_PROJECT_ID}' - | filter - metric.response_code == 429 - || metric.response_code == 200 - | group_by [] | within 3600s | every 3600s""" + enriched_query = """fetch gae_app +| metric 'appengine.googleapis.com/http/server/response_count' +| filter resource.project_id == 'slo-generator-demo' +| filter + metric.response_code == 429 + || metric.response_code == 200 +| group_by [] | within 3600s, d'2022/10/28 22:10:15' | every 3600s""" - for query in queries: - assert CloudMonitoringMqlBackend._fmt_query(query, 3600) == formatted_query + assert ( + CloudMonitoringMqlBackend.enrich_query_with_time_horizon_and_period( + timestamp, window, query + ) + == enriched_query + )