From 04a11a6110e8ba646b1c0d4f6a5fb3d5c30889bb Mon Sep 17 00:00:00 2001 From: Lester Szeto Date: Tue, 7 Jan 2025 22:14:20 -0800 Subject: [PATCH] chore: Add Custom OpenTelemetry Exporter in for Service Metrics (#1273) * chore: Add Custom OpenTelemetry Exporter in for Service Metrics * Updated copyright dates to 2025 --------- Co-authored-by: rahul2393 --- google/cloud/spanner_v1/metrics/README.md | 19 + google/cloud/spanner_v1/metrics/constants.py | 63 +++ .../spanner_v1/metrics/metrics_exporter.py | 392 ++++++++++++++ setup.py | 1 + testing/constraints-3.10.txt | 1 + testing/constraints-3.11.txt | 1 + testing/constraints-3.12.txt | 1 + testing/constraints-3.13.txt | 1 + testing/constraints-3.7.txt | 1 + testing/constraints-3.8.txt | 1 + testing/constraints-3.9.txt | 1 + tests/unit/test_metric_exporter.py | 488 ++++++++++++++++++ 12 files changed, 970 insertions(+) create mode 100644 google/cloud/spanner_v1/metrics/README.md create mode 100644 google/cloud/spanner_v1/metrics/constants.py create mode 100644 google/cloud/spanner_v1/metrics/metrics_exporter.py create mode 100644 tests/unit/test_metric_exporter.py diff --git a/google/cloud/spanner_v1/metrics/README.md b/google/cloud/spanner_v1/metrics/README.md new file mode 100644 index 0000000000..9619715c85 --- /dev/null +++ b/google/cloud/spanner_v1/metrics/README.md @@ -0,0 +1,19 @@ +# Custom Metric Exporter +The custom metric exporter, as defined in [metrics_exporter.py](./metrics_exporter.py), is designed to work in conjunction with OpenTelemetry and the Spanner client. It converts data into its protobuf equivalent and sends it to Google Cloud Monitoring. + +## Filtering Criteria +The exporter filters metrics based on the following conditions, utilizing values defined in [constants.py](./constants.py): + +* Metrics with a scope set to `gax-python`. +* Metrics with one of the following predefined names: + * `attempt_latencies` + * `attempt_count` + * `operation_latencies` + * `operation_count` + * `gfe_latency` + * `gfe_missing_header_count` + +## Service Endpoint +The exporter sends metrics to the Google Cloud Monitoring [service endpoint](https://cloud.google.com/python/docs/reference/monitoring/latest/google.cloud.monitoring_v3.services.metric_service.MetricServiceClient#google_cloud_monitoring_v3_services_metric_service_MetricServiceClient_create_service_time_series), distinct from the regular client endpoint. This service endpoint operates under a different quota limit than the user endpoint and features an additional server-side filter that only permits a predefined set of metrics to pass through. + +When introducing new service metrics, it is essential to ensure they are allowed through by the server-side filter as well. diff --git a/google/cloud/spanner_v1/metrics/constants.py b/google/cloud/spanner_v1/metrics/constants.py new file mode 100644 index 0000000000..5eca1fa83d --- /dev/null +++ b/google/cloud/spanner_v1/metrics/constants.py @@ -0,0 +1,63 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +BUILT_IN_METRICS_METER_NAME = "gax-python" +NATIVE_METRICS_PREFIX = "spanner.googleapis.com/internal/client" +SPANNER_RESOURCE_TYPE = "spanner_instance_client" + +# Monitored resource labels +MONITORED_RES_LABEL_KEY_PROJECT = "project_id" +MONITORED_RES_LABEL_KEY_INSTANCE = "instance_id" +MONITORED_RES_LABEL_KEY_INSTANCE_CONFIG = "instance_config" +MONITORED_RES_LABEL_KEY_LOCATION = "location" +MONITORED_RES_LABEL_KEY_CLIENT_HASH = "client_hash" +MONITORED_RESOURCE_LABELS = [ + MONITORED_RES_LABEL_KEY_PROJECT, + MONITORED_RES_LABEL_KEY_INSTANCE, + MONITORED_RES_LABEL_KEY_INSTANCE_CONFIG, + MONITORED_RES_LABEL_KEY_LOCATION, + MONITORED_RES_LABEL_KEY_CLIENT_HASH, +] + +# Metric labels +METRIC_LABEL_KEY_CLIENT_UID = "client_uid" +METRIC_LABEL_KEY_CLIENT_NAME = "client_name" +METRIC_LABEL_KEY_DATABASE = "database" +METRIC_LABEL_KEY_METHOD = "method" +METRIC_LABEL_KEY_STATUS = "status" +METRIC_LABEL_KEY_DIRECT_PATH_ENABLED = "directpath_enabled" +METRIC_LABEL_KEY_DIRECT_PATH_USED = "directpath_used" +METRIC_LABELS = [ + METRIC_LABEL_KEY_CLIENT_UID, + METRIC_LABEL_KEY_CLIENT_NAME, + METRIC_LABEL_KEY_DATABASE, + METRIC_LABEL_KEY_METHOD, + METRIC_LABEL_KEY_STATUS, + METRIC_LABEL_KEY_DIRECT_PATH_ENABLED, + METRIC_LABEL_KEY_DIRECT_PATH_USED, +] + +# Metric names +METRIC_NAME_OPERATION_LATENCIES = "operation_latencies" +METRIC_NAME_ATTEMPT_LATENCIES = "attempt_latencies" +METRIC_NAME_OPERATION_COUNT = "operation_count" +METRIC_NAME_ATTEMPT_COUNT = "attempt_count" +METRIC_NAME_GFE_LATENCY = "gfe_latency" +METRIC_NAME_GFE_MISSING_HEADER_COUNT = "gfe_missing_header_count" +METRIC_NAMES = [ + METRIC_NAME_OPERATION_LATENCIES, + METRIC_NAME_ATTEMPT_LATENCIES, + METRIC_NAME_OPERATION_COUNT, + METRIC_NAME_ATTEMPT_COUNT, +] diff --git a/google/cloud/spanner_v1/metrics/metrics_exporter.py b/google/cloud/spanner_v1/metrics/metrics_exporter.py new file mode 100644 index 0000000000..f7d3aa18c8 --- /dev/null +++ b/google/cloud/spanner_v1/metrics/metrics_exporter.py @@ -0,0 +1,392 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .constants import ( + BUILT_IN_METRICS_METER_NAME, + NATIVE_METRICS_PREFIX, + SPANNER_RESOURCE_TYPE, + MONITORED_RESOURCE_LABELS, + METRIC_LABELS, + METRIC_NAMES, +) + +import logging +from typing import Optional, List, Union, NoReturn, Tuple + +import google.auth +from google.api.distribution_pb2 import ( # pylint: disable=no-name-in-module + Distribution, +) + +# pylint: disable=no-name-in-module +from google.api.metric_pb2 import ( # pylint: disable=no-name-in-module + Metric as GMetric, + MetricDescriptor, +) +from google.api.monitored_resource_pb2 import ( # pylint: disable=no-name-in-module + MonitoredResource, +) + +from google.cloud.monitoring_v3.services.metric_service.transports.grpc import ( + MetricServiceGrpcTransport, +) + +# pylint: disable=no-name-in-module +from google.protobuf.timestamp_pb2 import Timestamp +from google.cloud.spanner_v1.gapic_version import __version__ + +try: + from opentelemetry.sdk.metrics.export import ( + Gauge, + Histogram, + HistogramDataPoint, + Metric, + MetricExporter, + MetricExportResult, + MetricsData, + NumberDataPoint, + Sum, + ) + from opentelemetry.sdk.resources import Resource + + HAS_OPENTELEMETRY_INSTALLED = True +except ImportError: + HAS_OPENTELEMETRY_INSTALLED = False + +try: + from google.cloud.monitoring_v3 import ( + CreateTimeSeriesRequest, + MetricServiceClient, + Point, + TimeInterval, + TimeSeries, + TypedValue, + ) + + HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True +except ImportError: + HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False + +HAS_DEPENDENCIES_INSTALLED = ( + HAS_OPENTELEMETRY_INSTALLED and HAS_GOOGLE_CLOUD_MONITORING_INSTALLED +) + +logger = logging.getLogger(__name__) +MAX_BATCH_WRITE = 200 +MILLIS_PER_SECOND = 1000 + +_USER_AGENT = f"python-spanner; google-cloud-service-metric-exporter {__version__}" + +# Set user-agent metadata, see https://github.com/grpc/grpc/issues/23644 and default options +# from +# https://github.com/googleapis/python-monitoring/blob/v2.11.3/google/cloud/monitoring_v3/services/metric_service/transports/grpc.py#L175-L178 +_OPTIONS = [ + ("grpc.max_send_message_length", -1), + ("grpc.max_receive_message_length", -1), + ("grpc.primary_user_agent", _USER_AGENT), +] + + +# pylint is unable to resolve members of protobuf objects +# pylint: disable=no-member +# pylint: disable=too-many-branches +# pylint: disable=too-many-locals +class CloudMonitoringMetricsExporter(MetricExporter): + """Implementation of Metrics Exporter to Google Cloud Monitoring. + + You can manually pass in project_id and client, or else the + Exporter will take that information from Application Default + Credentials. + + Args: + project_id: project id of your Google Cloud project. + client: Client to upload metrics to Google Cloud Monitoring. + """ + + # Based on the cloud_monitoring exporter found here: https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/main/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py + + def __init__( + self, + project_id: Optional[str] = None, + client: Optional[MetricServiceClient] = None, + ): + """Initialize a custom exporter to send metrics for the Spanner Service Metrics.""" + # Default preferred_temporality is all CUMULATIVE so need to customize + super().__init__() + + # Create a new GRPC Client for Google Cloud Monitoring if not provided + self.client = client or MetricServiceClient( + transport=MetricServiceGrpcTransport( + channel=MetricServiceGrpcTransport.create_channel( + options=_OPTIONS, + ) + ) + ) + + # Set project information + self.project_id: str + if not project_id: + _, default_project_id = google.auth.default() + self.project_id = str(default_project_id) + else: + self.project_id = project_id + self.project_name = self.client.common_project_path(self.project_id) + + def _batch_write(self, series: List[TimeSeries], timeout_millis: float) -> None: + """Cloud Monitoring allows writing up to 200 time series at once. + + :param series: ProtoBuf TimeSeries + :return: + """ + write_ind = 0 + timeout = timeout_millis / MILLIS_PER_SECOND + while write_ind < len(series): + request = CreateTimeSeriesRequest( + name=self.project_name, + time_series=series[write_ind : write_ind + MAX_BATCH_WRITE], + ) + + self.client.create_service_time_series( + request=request, + timeout=timeout, + ) + write_ind += MAX_BATCH_WRITE + + @staticmethod + def _resource_to_monitored_resource_pb( + resource: Resource, labels: any + ) -> MonitoredResource: + """ + Convert the resource to a Google Cloud Monitoring monitored resource. + + :param resource: OpenTelemetry resource + :param labels: labels to add to the monitored resource + :return: Google Cloud Monitoring monitored resource + """ + monitored_resource = MonitoredResource( + type=SPANNER_RESOURCE_TYPE, + labels=labels, + ) + return monitored_resource + + @staticmethod + def _to_metric_kind(metric: Metric) -> MetricDescriptor.MetricKind: + """ + Convert the metric to a Google Cloud Monitoring metric kind. + + :param metric: OpenTelemetry metric + :return: Google Cloud Monitoring metric kind + """ + data = metric.data + if isinstance(data, Sum): + if data.is_monotonic: + return MetricDescriptor.MetricKind.CUMULATIVE + else: + return MetricDescriptor.MetricKind.GAUGE + elif isinstance(data, Gauge): + return MetricDescriptor.MetricKind.GAUGE + elif isinstance(data, Histogram): + return MetricDescriptor.MetricKind.CUMULATIVE + else: + # Exhaustive check + _: NoReturn = data + logger.warning( + "Unsupported metric data type %s, ignoring it", + type(data).__name__, + ) + return None + + @staticmethod + def _extract_metric_labels( + data_point: Union[NumberDataPoint, HistogramDataPoint] + ) -> Tuple[dict, dict]: + """ + Extract the metric labels from the data point. + + :param data_point: OpenTelemetry data point + :return: tuple of metric labels and monitored resource labels + """ + metric_labels = {} + monitored_resource_labels = {} + for key, value in (data_point.attributes or {}).items(): + normalized_key = _normalize_label_key(key) + val = str(value) + if key in METRIC_LABELS: + metric_labels[normalized_key] = val + if key in MONITORED_RESOURCE_LABELS: + monitored_resource_labels[normalized_key] = val + return metric_labels, monitored_resource_labels + + # Unchanged from https://github.com/GoogleCloudPlatform/opentelemetry-operations-python/blob/main/opentelemetry-exporter-gcp-monitoring/src/opentelemetry/exporter/cloud_monitoring/__init__.py + @staticmethod + def _to_point( + kind: "MetricDescriptor.MetricKind.V", + data_point: Union[NumberDataPoint, HistogramDataPoint], + ) -> Point: + # Create a Google Cloud Monitoring data point value based on the OpenTelemetry metric data point type + ## For histograms, we need to calculate the mean and bucket counts + if isinstance(data_point, HistogramDataPoint): + mean = data_point.sum / data_point.count if data_point.count else 0.0 + point_value = TypedValue( + distribution_value=Distribution( + count=data_point.count, + mean=mean, + bucket_counts=data_point.bucket_counts, + bucket_options=Distribution.BucketOptions( + explicit_buckets=Distribution.BucketOptions.Explicit( + bounds=data_point.explicit_bounds, + ) + ), + ) + ) + else: + # For other metric types, we can use the data point value directly + if isinstance(data_point.value, int): + point_value = TypedValue(int64_value=data_point.value) + else: + point_value = TypedValue(double_value=data_point.value) + + # DELTA case should never happen but adding it to be future proof + if ( + kind is MetricDescriptor.MetricKind.CUMULATIVE + or kind is MetricDescriptor.MetricKind.DELTA + ): + # Create a Google Cloud Monitoring time interval from the OpenTelemetry data point timestamps + interval = TimeInterval( + start_time=_timestamp_from_nanos(data_point.start_time_unix_nano), + end_time=_timestamp_from_nanos(data_point.time_unix_nano), + ) + else: + # For non time ranged metrics, we only need the end time + interval = TimeInterval( + end_time=_timestamp_from_nanos(data_point.time_unix_nano), + ) + return Point(interval=interval, value=point_value) + + @staticmethod + def _data_point_to_timeseries_pb( + data_point, + metric, + monitored_resource, + labels, + ) -> TimeSeries: + """ + Convert the data point to a Google Cloud Monitoring time series. + + :param data_point: OpenTelemetry data point + :param metric: OpenTelemetry metric + :param monitored_resource: Google Cloud Monitoring monitored resource + :param labels: metric labels + :return: Google Cloud Monitoring time series + """ + if metric.name not in METRIC_NAMES: + return None + + kind = CloudMonitoringMetricsExporter._to_metric_kind(metric) + point = CloudMonitoringMetricsExporter._to_point(kind, data_point) + type = f"{NATIVE_METRICS_PREFIX}/{metric.name}" + series = TimeSeries( + resource=monitored_resource, + metric_kind=kind, + points=[point], + metric=GMetric(type=type, labels=labels), + unit=metric.unit or "", + ) + return series + + @staticmethod + def _resource_metrics_to_timeseries_pb( + metrics_data: MetricsData, + ) -> List[TimeSeries]: + """ + Convert the metrics data to a list of Google Cloud Monitoring time series. + + :param metrics_data: OpenTelemetry metrics data + :return: list of Google Cloud Monitoring time series + """ + timeseries_list = [] + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + # Filter for spanner builtin metrics + if scope_metric.scope.name != BUILT_IN_METRICS_METER_NAME: + continue + + for metric in scope_metric.metrics: + for data_point in metric.data.data_points: + ( + metric_labels, + monitored_resource_labels, + ) = CloudMonitoringMetricsExporter._extract_metric_labels( + data_point + ) + monitored_resource = CloudMonitoringMetricsExporter._resource_to_monitored_resource_pb( + resource_metric.resource, monitored_resource_labels + ) + timeseries = ( + CloudMonitoringMetricsExporter._data_point_to_timeseries_pb( + data_point, metric, monitored_resource, metric_labels + ) + ) + if timeseries is not None: + timeseries_list.append(timeseries) + + return timeseries_list + + def export( + self, + metrics_data: MetricsData, + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: + """ + Export the metrics data to Google Cloud Monitoring. + + :param metrics_data: OpenTelemetry metrics data + :param timeout_millis: timeout in milliseconds + :return: MetricExportResult + """ + if not HAS_DEPENDENCIES_INSTALLED: + logger.warning("Metric exporter called without dependencies installed.") + return False + + time_series_list = self._resource_metrics_to_timeseries_pb(metrics_data) + self._batch_write(time_series_list, timeout_millis) + return True + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + """Not implemented.""" + return True + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + """Not implemented.""" + pass + + +def _timestamp_from_nanos(nanos: int) -> Timestamp: + ts = Timestamp() + ts.FromNanoseconds(nanos) + return ts + + +def _normalize_label_key(key: str) -> str: + """Make the key into a valid Google Cloud Monitoring label key. + + See reference impl + https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/blob/e955c204f4f2bfdc92ff0ad52786232b975efcc2/exporter/metric/metric.go#L595-L604 + """ + sanitized = "".join(c if c.isalpha() or c.isnumeric() else "_" for c in key) + if sanitized[0].isdigit(): + sanitized = "key_" + sanitized + return sanitized diff --git a/setup.py b/setup.py index 544d117fd7..619607b794 100644 --- a/setup.py +++ b/setup.py @@ -50,6 +50,7 @@ "opentelemetry-api >= 1.22.0", "opentelemetry-sdk >= 1.22.0", "opentelemetry-semantic-conventions >= 0.43b0", + "google-cloud-monitoring >= 2.16.0", ], "libcst": "libcst >= 0.2.5", } diff --git a/testing/constraints-3.10.txt b/testing/constraints-3.10.txt index ad3f0fa58e..5369861daf 100644 --- a/testing/constraints-3.10.txt +++ b/testing/constraints-3.10.txt @@ -5,3 +5,4 @@ google-api-core proto-plus protobuf grpc-google-iam-v1 +google-cloud-monitoring diff --git a/testing/constraints-3.11.txt b/testing/constraints-3.11.txt index ad3f0fa58e..28bc2bd36c 100644 --- a/testing/constraints-3.11.txt +++ b/testing/constraints-3.11.txt @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # This constraints file is required for unit tests. # List all library dependencies and extras in this file. +google-cloud-monitoring google-api-core proto-plus protobuf diff --git a/testing/constraints-3.12.txt b/testing/constraints-3.12.txt index ad3f0fa58e..5369861daf 100644 --- a/testing/constraints-3.12.txt +++ b/testing/constraints-3.12.txt @@ -5,3 +5,4 @@ google-api-core proto-plus protobuf grpc-google-iam-v1 +google-cloud-monitoring diff --git a/testing/constraints-3.13.txt b/testing/constraints-3.13.txt index ad3f0fa58e..5369861daf 100644 --- a/testing/constraints-3.13.txt +++ b/testing/constraints-3.13.txt @@ -5,3 +5,4 @@ google-api-core proto-plus protobuf grpc-google-iam-v1 +google-cloud-monitoring diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index e468d57168..af33b0c8e8 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -16,3 +16,4 @@ opentelemetry-semantic-conventions==0.43b0 protobuf==3.20.2 deprecated==1.2.14 grpc-interceptor==0.15.4 +google-cloud-monitoring==2.16.0 diff --git a/testing/constraints-3.8.txt b/testing/constraints-3.8.txt index ad3f0fa58e..5369861daf 100644 --- a/testing/constraints-3.8.txt +++ b/testing/constraints-3.8.txt @@ -5,3 +5,4 @@ google-api-core proto-plus protobuf grpc-google-iam-v1 +google-cloud-monitoring diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index ad3f0fa58e..5369861daf 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -5,3 +5,4 @@ google-api-core proto-plus protobuf grpc-google-iam-v1 +google-cloud-monitoring diff --git a/tests/unit/test_metric_exporter.py b/tests/unit/test_metric_exporter.py new file mode 100644 index 0000000000..08ae9ecf21 --- /dev/null +++ b/tests/unit/test_metric_exporter.py @@ -0,0 +1,488 @@ +# Copyright 2016 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import patch, MagicMock, Mock +from google.cloud.spanner_v1.metrics.metrics_exporter import ( + CloudMonitoringMetricsExporter, + _normalize_label_key, +) +from google.api.metric_pb2 import MetricDescriptor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ( + InMemoryMetricReader, + Sum, + Gauge, + Histogram, + NumberDataPoint, + HistogramDataPoint, + AggregationTemporality, +) +from google.cloud.spanner_v1.metrics.constants import METRIC_NAME_OPERATION_COUNT + +from tests._helpers import ( + HAS_OPENTELEMETRY_INSTALLED, +) + + +# Test Constants +PROJECT_ID = "fake-project-id" +INSTANCE_ID = "fake-instance-id" +DATABASE_ID = "fake-database-id" +SCOPE_NAME = "gax-python" + +# Skip tests if opentelemetry is not installed +if HAS_OPENTELEMETRY_INSTALLED: + + class TestMetricsExporter(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.metric_attributes = { + "project_id": PROJECT_ID, + "instance_id": INSTANCE_ID, + "instance_config": "test_config", + "location": "test_location", + "client_hash": "test_hash", + "client_uid": "test_uid", + "client_name": "test_name", + "database": DATABASE_ID, + "method": "test_method", + "status": "test_status", + "directpath_enabled": "true", + "directpath_used": "false", + "other": "ignored", + } + + def setUp(self): + self.metric_reader = InMemoryMetricReader() + self.provider = MeterProvider(metric_readers=[self.metric_reader]) + self.meter = self.provider.get_meter(SCOPE_NAME) + self.operation_count = self.meter.create_counter( + name=METRIC_NAME_OPERATION_COUNT, + description="A test counter", + unit="counts", + ) + + def test_default_ctor(self): + exporter = CloudMonitoringMetricsExporter() + self.assertIsNotNone(exporter.project_id) + + def test_normalize_label_key(self): + """Test label key normalization""" + test_cases = [ + ("simple", "simple"), + ("with space", "with_space"), + ("with-dash", "with_dash"), + ("123_number_prefix", "key_123_number_prefix"), + ("special!characters@", "special_characters_"), + ] + + for input_key, expected_output in test_cases: + self.assertEqual(_normalize_label_key(input_key), expected_output) + + def test_to_metric_kind(self): + """Test conversion of different metric types to GCM metric kinds""" + # Test monotonic Sum returns CUMULATIVE + metric_sum = Mock( + data=Sum( + data_points=[], + aggregation_temporality=AggregationTemporality.UNSPECIFIED, + is_monotonic=True, + ) + ) + self.assertEqual( + CloudMonitoringMetricsExporter._to_metric_kind(metric_sum), + MetricDescriptor.MetricKind.CUMULATIVE, + ) + + # Test non-monotonic Sum returns GAUGE + metric_sum_non_monotonic = Mock( + data=Sum( + data_points=[], + aggregation_temporality=AggregationTemporality.UNSPECIFIED, + is_monotonic=False, + ) + ) + self.assertEqual( + CloudMonitoringMetricsExporter._to_metric_kind( + metric_sum_non_monotonic + ), + MetricDescriptor.MetricKind.GAUGE, + ) + + # Test Gauge returns GAUGE + metric_gauge = Mock(data=Gauge(data_points=[])) + self.assertEqual( + CloudMonitoringMetricsExporter._to_metric_kind(metric_gauge), + MetricDescriptor.MetricKind.GAUGE, + ) + + # Test Histogram returns CUMULATIVE + metric_histogram = Mock( + data=Histogram( + data_points=[], + aggregation_temporality=AggregationTemporality.UNSPECIFIED, + ) + ) + self.assertEqual( + CloudMonitoringMetricsExporter._to_metric_kind(metric_histogram), + MetricDescriptor.MetricKind.CUMULATIVE, + ) + + # Test Unknown data type warns + metric_unknown = Mock(data=Mock()) + with self.assertLogs( + "google.cloud.spanner_v1.metrics.metrics_exporter", level="WARNING" + ) as log: + self.assertIsNone( + CloudMonitoringMetricsExporter._to_metric_kind(metric_unknown) + ) + self.assertIn( + "WARNING:google.cloud.spanner_v1.metrics.metrics_exporter:Unsupported metric data type Mock, ignoring it", + log.output, + ) + + def test_extract_metric_labels(self): + """Test extraction of metric and resource labels""" + import time + + data_point = NumberDataPoint( + attributes={ + # Metric labels + "client_uid": "test-client-uid", + "client_name": "test-client-name", + "database": "test-db", + "method": "test-method", + "status": "test-status", + "directpath_enabled": "test-directpath-enabled", + "directpath_used": "test-directpath-used", + # Monitored Resource label + "project_id": "test-project-id", + "instance_id": "test-instance-id", + "instance_config": "test-instance-config", + "location": "test-location", + "client_hash": "test-client-hash", + # All other labels ignored + "unknown": "ignored", + "Client_UID": "ignored", + }, + start_time_unix_nano=time.time_ns(), + time_unix_nano=time.time_ns(), + value=0, + ) + + ( + metric_labels, + resource_labels, + ) = CloudMonitoringMetricsExporter._extract_metric_labels(data_point) + + # Verify that the attributes are properly distributed and reassigned + + ## Metric Labels + self.assertIn("client_uid", metric_labels) + self.assertEqual(metric_labels["client_uid"], "test-client-uid") + self.assertIn("client_name", metric_labels) + self.assertEqual(metric_labels["client_name"], "test-client-name") + self.assertIn("database", metric_labels) + self.assertEqual(metric_labels["database"], "test-db") + self.assertIn("method", metric_labels) + self.assertEqual(metric_labels["method"], "test-method") + self.assertIn("status", metric_labels) + self.assertEqual(metric_labels["status"], "test-status") + self.assertIn("directpath_enabled", metric_labels) + self.assertEqual( + metric_labels["directpath_enabled"], "test-directpath-enabled" + ) + self.assertIn("directpath_used", metric_labels) + self.assertEqual(metric_labels["directpath_used"], "test-directpath-used") + + ## Metric Resource Labels + self.assertIn("project_id", resource_labels) + self.assertEqual(resource_labels["project_id"], "test-project-id") + self.assertIn("instance_id", resource_labels) + self.assertEqual(resource_labels["instance_id"], "test-instance-id") + self.assertIn("instance_config", resource_labels) + self.assertEqual(resource_labels["instance_config"], "test-instance-config") + self.assertIn("location", resource_labels) + self.assertEqual(resource_labels["location"], "test-location") + self.assertIn("client_hash", resource_labels) + self.assertEqual(resource_labels["client_hash"], "test-client-hash") + + # Other attributes are ignored + self.assertNotIn("unknown", metric_labels) + self.assertNotIn("unknown", resource_labels) + ## including case sensitive keys + self.assertNotIn("Client_UID", metric_labels) + self.assertNotIn("Client_UID", resource_labels) + + def test_metric_timeseries_conversion(self): + """Test to verify conversion from OTEL Metrics to GCM Time Series.""" + # Add metrics + self.operation_count.add(1, attributes=self.metric_attributes) + self.operation_count.add(2, attributes=self.metric_attributes) + + # Export metrics + metrics = self.metric_reader.get_metrics_data() + self.assertTrue(metrics is not None) + + exporter = CloudMonitoringMetricsExporter(PROJECT_ID) + timeseries = exporter._resource_metrics_to_timeseries_pb(metrics) + + # Both counter values should be summed together + self.assertEqual(len(timeseries), 1) + self.assertEqual(timeseries[0].points.pop(0).value.int64_value, 3) + + def test_metric_timeseries_scope_filtering(self): + """Test to verify that metrics without the `gax-python` scope are filtered out.""" + # Create metric instruments + meter = self.provider.get_meter("WRONG_SCOPE") + counter = meter.create_counter( + name="operation_latencies", description="A test counter", unit="ms" + ) + + # Add metrics + counter.add(1, attributes=self.metric_attributes) + counter.add(2, attributes=self.metric_attributes) + + # Export metrics + metrics = self.metric_reader.get_metrics_data() + exporter = CloudMonitoringMetricsExporter(PROJECT_ID) + timeseries = exporter._resource_metrics_to_timeseries_pb(metrics) + + # Metris with incorrect sope should be filtered out + self.assertEqual(len(timeseries), 0) + + def test_batch_write(self): + """Verify that writes happen in batches of 200""" + from google.protobuf.timestamp_pb2 import Timestamp + from google.cloud.monitoring_v3 import MetricServiceClient + from google.api.monitored_resource_pb2 import MonitoredResource + from google.api.metric_pb2 import Metric as GMetric + import random + from google.cloud.monitoring_v3 import ( + TimeSeries, + Point, + TimeInterval, + TypedValue, + ) + + mockClient = MagicMock(spec=MetricServiceClient) + mockClient.create_service_time_series = Mock(return_value=None) + exporter = CloudMonitoringMetricsExporter(PROJECT_ID, mockClient) + + # Create timestamps for the time series + start_time = Timestamp() + start_time.FromSeconds(1234567890) + end_time = Timestamp() + end_time.FromSeconds(1234567900) + + # Create test time series + timeseries = [] + for i in range(400): + timeseries.append( + TimeSeries( + metric=GMetric( + type=f"custom.googleapis.com/spanner/test_metric_{i}", + labels={"client_uid": "test-client", "database": "test-db"}, + ), + resource=MonitoredResource( + type="spanner_instance", + labels={ + "project_id": PROJECT_ID, + "instance_id": INSTANCE_ID, + "location": "test-location", + }, + ), + metric_kind=MetricDescriptor.MetricKind.CUMULATIVE, + points=[ + Point( + interval=TimeInterval( + start_time=start_time, end_time=end_time + ), + value=TypedValue(int64_value=random.randint(1, 100)), + ) + ], + ), + ) + + # Define a side effect to extract time series data passed to mocked CreatetimeSeriesRquest + tsr_timeseries = [] + + def create_tsr_side_effect(name, time_series): + nonlocal tsr_timeseries + tsr_timeseries = time_series + + patch_path = "google.cloud.spanner_v1.metrics.metrics_exporter.CreateTimeSeriesRequest" + with patch(patch_path, side_effect=create_tsr_side_effect): + exporter._batch_write(timeseries, 10000) + # Verify that the Create Time Series calls happen in batches of max 200 elements + self.assertTrue(len(tsr_timeseries) > 0 and len(tsr_timeseries) <= 200) + + # Verify the mock was called with the correct arguments + self.assertEqual(len(mockClient.create_service_time_series.mock_calls), 2) + + @patch( + "google.cloud.spanner_v1.metrics.metrics_exporter.HAS_DEPENDENCIES_INSTALLED", + False, + ) + def test_export_early_exit_if_extras_not_installed(self): + """Verify that Export will early exit and return None if OpenTelemetry and/or Google Cloud Monitoring extra modules are not installed.""" + # Suppress expected warning log + with self.assertLogs( + "google.cloud.spanner_v1.metrics.metrics_exporter", level="WARNING" + ) as log: + exporter = CloudMonitoringMetricsExporter(PROJECT_ID) + self.assertFalse(exporter.export([])) + self.assertIn( + "WARNING:google.cloud.spanner_v1.metrics.metrics_exporter:Metric exporter called without dependencies installed.", + log.output, + ) + + def test_export(self): + """Verify that the export call will convert and send the requests out.""" + # Create metric instruments + meter = self.provider.get_meter("gax-python") + counter = meter.create_counter( + name="attempt_count", description="A test counter", unit="count" + ) + latency = meter.create_counter( + name="attempt_latencies", description="test latencies", unit="ms" + ) + + # Add metrics + counter.add(10, attributes=self.metric_attributes) + counter.add(25, attributes=self.metric_attributes) + latency.add(30, attributes=self.metric_attributes) + latency.add(45, attributes=self.metric_attributes) + + # Export metrics + metrics = self.metric_reader.get_metrics_data() + mock_client = Mock() + exporter = CloudMonitoringMetricsExporter(PROJECT_ID, mock_client) + patch_path = "google.cloud.spanner_v1.metrics.metrics_exporter.CloudMonitoringMetricsExporter._batch_write" + with patch(patch_path) as mock_batch_write: + exporter.export(metrics) + + # Verify metrics passed to be sent to Google Cloud Monitoring + mock_batch_write.assert_called_once() + batch_args, _ = mock_batch_write.call_args + timeseries = batch_args[0] + self.assertEqual(len(timeseries), 2) + + def test_force_flush(self): + """Verify that the unimplemented force flush can be called.""" + exporter = CloudMonitoringMetricsExporter(PROJECT_ID) + self.assertTrue(exporter.force_flush()) + + def test_shutdown(self): + """Verify that the unimplemented shutdown can be called.""" + exporter = CloudMonitoringMetricsExporter() + try: + exporter.shutdown() + except Exception as e: + self.fail(f"Shutdown() raised an exception: {e}") + + def test_data_point_to_timeseries_early_exit(self): + """Early exit function if an unknown metric name is supplied.""" + metric = Mock(name="TestMetricName") + self.assertIsNone( + CloudMonitoringMetricsExporter._data_point_to_timeseries_pb( + None, metric, None, None + ) + ) + + @patch( + "google.cloud.spanner_v1.metrics.metrics_exporter.CloudMonitoringMetricsExporter._data_point_to_timeseries_pb" + ) + def test_metrics_to_time_series_empty_input( + self, mocked_data_point_to_timeseries_pb + ): + """Verify that metric entries with no timeseries data do not return a time series entry.""" + exporter = CloudMonitoringMetricsExporter() + data_point = Mock() + metric = Mock(data_points=[data_point]) + scope_metric = Mock( + metrics=[metric], scope=Mock(name="operation_latencies") + ) + resource_metric = Mock(scope_metrics=[scope_metric]) + metrics_data = Mock(resource_metrics=[resource_metric]) + + exporter._resource_metrics_to_timeseries_pb(metrics_data) + + def test_to_point(self): + """Verify conversion of datapoints.""" + exporter = CloudMonitoringMetricsExporter() + + number_point = NumberDataPoint( + attributes=[], start_time_unix_nano=0, time_unix_nano=0, value=9 + ) + + # Test that provided int number point values are set to the converted int data point + converted_num_point = exporter._to_point( + MetricDescriptor.MetricKind.CUMULATIVE, number_point + ) + + self.assertEqual(converted_num_point.value.int64_value, 9) + + # Test that provided float number point values are set to converted double data point + float_number_point = NumberDataPoint( + attributes=[], start_time_unix_nano=0, time_unix_nano=0, value=12.20 + ) + converted_float_num_point = exporter._to_point( + MetricDescriptor.MetricKind.CUMULATIVE, float_number_point + ) + self.assertEqual(converted_float_num_point.value.double_value, 12.20) + + hist_point = HistogramDataPoint( + attributes=[], + start_time_unix_nano=123, + time_unix_nano=456, + count=1, + sum=2, + bucket_counts=[3], + explicit_bounds=[4], + min=5.0, + max=6.0, + ) + + # Test that provided histogram point values are set to the converted data point + converted_hist_point = exporter._to_point( + MetricDescriptor.MetricKind.CUMULATIVE, hist_point + ) + self.assertEqual(converted_hist_point.value.distribution_value.count, 1) + self.assertEqual(converted_hist_point.value.distribution_value.mean, 2) + + hist_point_missing_count = HistogramDataPoint( + attributes=[], + start_time_unix_nano=123, + time_unix_nano=456, + count=None, + sum=2, + bucket_counts=[3], + explicit_bounds=[4], + min=5.0, + max=6.0, + ) + + # Test that histogram points missing a count value has mean defaulted to 0 + # and that non cmulative / delta kinds default to single timepoint interval + converted_hist_point_no_count = exporter._to_point( + MetricDescriptor.MetricKind.METRIC_KIND_UNSPECIFIED, + hist_point_missing_count, + ) + self.assertEqual( + converted_hist_point_no_count.value.distribution_value.mean, 0 + ) + self.assertIsNone(converted_hist_point_no_count.interval.start_time) + self.assertIsNotNone(converted_hist_point_no_count.interval.end_time)