From 162707ad2d422678fad50c946b8cf387237dd14c Mon Sep 17 00:00:00 2001 From: KevinGG Date: Thu, 3 Mar 2022 14:47:56 -0800 Subject: [PATCH] [BEAM-14016] Fixed flaky postcommit test Fixed SpannerWriteIntegrationTest.test_spanner_update by fixing the metric exporter usage in spannerio. --- .../io/gcp/experimental/spannerio.py | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py index e36fbad09214..39a3a27aaf89 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio.py @@ -156,7 +156,7 @@ transactions sorted by table and primary key. WriteToSpanner transforms starts with the grouping into batches. The first step -in this process is to make the make the mutation groups of the WriteMutation +in this process is to make the mutation groups of the WriteMutation objects and then filtering them into batchable and unbatchable mutation groups. There are three batching parameters (max_number_cells, max_number_rows & max_batch_size_bytes). We calculated th mutation byte size from the method @@ -1202,9 +1202,12 @@ def __init__(self, spanner_configuration): monitoring_infos.SPANNER_PROJECT_ID: spanner_configuration.project, monitoring_infos.SPANNER_DATABASE_ID: spanner_configuration.database, } - self.service_metric = None + # table_id to metrics + self.service_metrics = {} - def _table_metric(self, table_id): + def _register_table_metric(self, table_id): + if table_id in self.service_metrics: + return database_id = self._spanner_configuration.database project_id = self._spanner_configuration.project resource = resource_identifiers.SpannerTable( @@ -1217,7 +1220,7 @@ def _table_metric(self, table_id): service_call_metric = ServiceCallMetric( request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN, base_labels=labels) - return service_call_metric + self.service_metrics[table_id] = service_call_metric def setup(self): spanner_client = Client(self._spanner_configuration.project) @@ -1226,13 +1229,16 @@ def setup(self): self._spanner_configuration.database, pool=self._spanner_configuration.pool) + def start_bundle(self): + self.service_metrics = {} + def process(self, element): self.batches.inc() try: with self._db_instance.batch() as b: for m in element: table_id = m.kwargs['table'] - self.service_metric = self._table_metric(table_id) + self._register_table_metric(table_id) if m.operation == WriteMutation._OPERATION_DELETE: batch_func = b.delete @@ -1247,14 +1253,17 @@ def process(self, element): else: raise ValueError("Unknown operation action: %s" % m.operation) batch_func(**m.kwargs) - - self.service_metric.call('ok') except (ClientError, GoogleAPICallError) as e: - self.service_metric.call(str(e.code.value)) + for service_metric in self.service_metrics.values(): + service_metric.call(str(e.code.value)) raise except HttpError as e: - self.service_metric.call(str(e)) + for service_metric in self.service_metrics.values(): + service_metric.call(str(e)) raise + else: + for service_metric in self.service_metrics.values(): + service_metric.call('ok') @with_input_types(typing.Union[MutationGroup, _Mutator])