diff --git a/google/cloud/bigtable/__init__.py b/google/cloud/bigtable/__init__.py index de46f8a75..70c87ade1 100644 --- a/google/cloud/bigtable/__init__.py +++ b/google/cloud/bigtable/__init__.py @@ -28,7 +28,7 @@ from google.cloud.bigtable.mutations_batcher import MutationsBatcher from google.cloud.bigtable.mutations import Mutation -from google.cloud.bigtable.mutations import BulkMutationsEntry +from google.cloud.bigtable.mutations import RowMutationEntry from google.cloud.bigtable.mutations import SetCell from google.cloud.bigtable.mutations import DeleteRangeFromColumn from google.cloud.bigtable.mutations import DeleteAllFromFamily @@ -47,7 +47,7 @@ "RowRange", "MutationsBatcher", "Mutation", - "BulkMutationsEntry", + "RowMutationEntry", "SetCell", "DeleteRangeFromColumn", "DeleteAllFromFamily", diff --git a/google/cloud/bigtable/_helpers.py b/google/cloud/bigtable/_helpers.py new file mode 100644 index 000000000..dec4c2014 --- /dev/null +++ b/google/cloud/bigtable/_helpers.py @@ -0,0 +1,111 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +from typing import Callable, Any +from inspect import iscoroutinefunction +import time + +from google.api_core import exceptions as core_exceptions +from google.cloud.bigtable.exceptions import RetryExceptionGroup + +""" +Helper functions used in various places in the library. +""" + + +def _make_metadata( + table_name: str, app_profile_id: str | None +) -> list[tuple[str, str]]: + """ + Create properly formatted gRPC metadata for requests. + """ + params = [] + params.append(f"table_name={table_name}") + if app_profile_id is not None: + params.append(f"app_profile_id={app_profile_id}") + params_str = ",".join(params) + return [("x-goog-request-params", params_str)] + + +def _attempt_timeout_generator( + per_request_timeout: float | None, operation_timeout: float +): + """ + Generator that yields the timeout value for each attempt of a retry loop. + + Will return per_request_timeout until the operation_timeout is approached, + at which point it will return the remaining time in the operation_timeout. + + Args: + - per_request_timeout: The timeout value to use for each request, in seconds. + If None, the operation_timeout will be used for each request. + - operation_timeout: The timeout value to use for the entire operationm in seconds. + Yields: + - The timeout value to use for the next request, in seonds + """ + per_request_timeout = ( + per_request_timeout if per_request_timeout is not None else operation_timeout + ) + deadline = operation_timeout + time.monotonic() + while True: + yield max(0, min(per_request_timeout, deadline - time.monotonic())) + + +def _convert_retry_deadline( + func: Callable[..., Any], + timeout_value: float | None = None, + retry_errors: list[Exception] | None = None, +): + """ + Decorator to convert RetryErrors raised by api_core.retry into + DeadlineExceeded exceptions, indicating that the underlying retries have + exhaused the timeout value. + Optionally attaches a RetryExceptionGroup to the DeadlineExceeded.__cause__, + detailing the failed exceptions associated with each retry. + + Supports both sync and async function wrapping. + + Args: + - func: The function to decorate + - timeout_value: The timeout value to display in the DeadlineExceeded error message + - retry_errors: An optional list of exceptions to attach as a RetryExceptionGroup to the DeadlineExceeded.__cause__ + """ + timeout_str = f" of {timeout_value:.1f}s" if timeout_value is not None else "" + error_str = f"operation_timeout{timeout_str} exceeded" + + def handle_error(): + new_exc = core_exceptions.DeadlineExceeded( + error_str, + ) + source_exc = None + if retry_errors: + source_exc = RetryExceptionGroup(retry_errors) + new_exc.__cause__ = source_exc + raise new_exc from source_exc + + # separate wrappers for async and sync functions + async def wrapper_async(*args, **kwargs): + try: + return await func(*args, **kwargs) + except core_exceptions.RetryError: + handle_error() + + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except core_exceptions.RetryError: + handle_error() + + return wrapper_async if iscoroutinefunction(func) else wrapper diff --git a/google/cloud/bigtable/_mutate_rows.py b/google/cloud/bigtable/_mutate_rows.py new file mode 100644 index 000000000..a422c99b2 --- /dev/null +++ b/google/cloud/bigtable/_mutate_rows.py @@ -0,0 +1,210 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import annotations + +from typing import TYPE_CHECKING +import functools + +from google.api_core import exceptions as core_exceptions +from google.api_core import retry_async as retries +import google.cloud.bigtable.exceptions as bt_exceptions +from google.cloud.bigtable._helpers import _make_metadata +from google.cloud.bigtable._helpers import _convert_retry_deadline +from google.cloud.bigtable._helpers import _attempt_timeout_generator + +if TYPE_CHECKING: + from google.cloud.bigtable_v2.services.bigtable.async_client import ( + BigtableAsyncClient, + ) + from google.cloud.bigtable.client import Table + from google.cloud.bigtable.mutations import RowMutationEntry + + +class _MutateRowsIncomplete(RuntimeError): + """ + Exception raised when a mutate_rows call has unfinished work. + """ + + pass + + +class _MutateRowsOperation: + """ + MutateRowsOperation manages the logic of sending a set of row mutations, + and retrying on failed entries. It manages this using the _run_attempt + function, which attempts to mutate all outstanding entries, and raises + _MutateRowsIncomplete if any retryable errors are encountered. + + Errors are exposed as a MutationsExceptionGroup, which contains a list of + exceptions organized by the related failed mutation entries. + """ + + def __init__( + self, + gapic_client: "BigtableAsyncClient", + table: "Table", + mutation_entries: list["RowMutationEntry"], + operation_timeout: float, + per_request_timeout: float | None, + ): + """ + Args: + - gapic_client: the client to use for the mutate_rows call + - table: the table associated with the request + - mutation_entries: a list of RowMutationEntry objects to send to the server + - operation_timeout: the timeout t o use for the entire operation, in seconds. + - per_request_timeout: the timeoutto use for each mutate_rows attempt, in seconds. + If not specified, the request will run until operation_timeout is reached. + """ + # create partial function to pass to trigger rpc call + metadata = _make_metadata(table.table_name, table.app_profile_id) + self._gapic_fn = functools.partial( + gapic_client.mutate_rows, + table_name=table.table_name, + app_profile_id=table.app_profile_id, + metadata=metadata, + ) + # create predicate for determining which errors are retryable + self.is_retryable = retries.if_exception_type( + # RPC level errors + core_exceptions.DeadlineExceeded, + core_exceptions.ServiceUnavailable, + # Entry level errors + _MutateRowsIncomplete, + ) + # build retryable operation + retry = retries.AsyncRetry( + predicate=self.is_retryable, + timeout=operation_timeout, + initial=0.01, + multiplier=2, + maximum=60, + ) + retry_wrapped = retry(self._run_attempt) + self._operation = _convert_retry_deadline(retry_wrapped, operation_timeout) + # initialize state + self.timeout_generator = _attempt_timeout_generator( + per_request_timeout, operation_timeout + ) + self.mutations = mutation_entries + self.remaining_indices = list(range(len(self.mutations))) + self.errors: dict[int, list[Exception]] = {} + + async def start(self): + """ + Start the operation, and run until completion + + Raises: + - MutationsExceptionGroup: if any mutations failed + """ + try: + # trigger mutate_rows + await self._operation() + except Exception as exc: + # exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations + incomplete_indices = self.remaining_indices.copy() + for idx in incomplete_indices: + self._handle_entry_error(idx, exc) + finally: + # raise exception detailing incomplete mutations + all_errors = [] + for idx, exc_list in self.errors.items(): + if len(exc_list) == 0: + raise core_exceptions.ClientError( + f"Mutation {idx} failed with no associated errors" + ) + elif len(exc_list) == 1: + cause_exc = exc_list[0] + else: + cause_exc = bt_exceptions.RetryExceptionGroup(exc_list) + entry = self.mutations[idx] + all_errors.append( + bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc) + ) + if all_errors: + raise bt_exceptions.MutationsExceptionGroup( + all_errors, len(self.mutations) + ) + + async def _run_attempt(self): + """ + Run a single attempt of the mutate_rows rpc. + + Raises: + - _MutateRowsIncomplete: if there are failed mutations eligible for + retry after the attempt is complete + - GoogleAPICallError: if the gapic rpc fails + """ + request_entries = [ + self.mutations[idx]._to_dict() for idx in self.remaining_indices + ] + # track mutations in this request that have not been finalized yet + active_request_indices = { + req_idx: orig_idx for req_idx, orig_idx in enumerate(self.remaining_indices) + } + self.remaining_indices = [] + if not request_entries: + # no more mutations. return early + return + # make gapic request + try: + result_generator = await self._gapic_fn( + timeout=next(self.timeout_generator), + entries=request_entries, + ) + async for result_list in result_generator: + for result in result_list.entries: + # convert sub-request index to global index + orig_idx = active_request_indices[result.index] + entry_error = core_exceptions.from_grpc_status( + result.status.code, + result.status.message, + details=result.status.details, + ) + if result.status.code != 0: + # mutation failed; update error list (and remaining_indices if retryable) + self._handle_entry_error(orig_idx, entry_error) + # remove processed entry from active list + del active_request_indices[result.index] + except Exception as exc: + # add this exception to list for each mutation that wasn't + # already handled, and update remaining_indices if mutation is retryable + for idx in active_request_indices.values(): + self._handle_entry_error(idx, exc) + # bubble up exception to be handled by retry wrapper + raise + # check if attempt succeeded, or needs to be retried + if self.remaining_indices: + # unfinished work; raise exception to trigger retry + raise _MutateRowsIncomplete + + def _handle_entry_error(self, idx: int, exc: Exception): + """ + Add an exception to the list of exceptions for a given mutation index, + and add the index to the list of remaining indices if the exception is + retryable. + + Args: + - idx: the index of the mutation that failed + - exc: the exception to add to the list + """ + entry = self.mutations[idx] + self.errors.setdefault(idx, []).append(exc) + if ( + entry.is_idempotent() + and self.is_retryable(exc) + and idx not in self.remaining_indices + ): + self.remaining_indices.append(idx) diff --git a/google/cloud/bigtable/_read_rows.py b/google/cloud/bigtable/_read_rows.py index 1c9e02d5a..ee094f1a7 100644 --- a/google/cloud/bigtable/_read_rows.py +++ b/google/cloud/bigtable/_read_rows.py @@ -20,13 +20,13 @@ AsyncIterable, AsyncIterator, AsyncGenerator, + Iterator, Callable, Awaitable, Type, ) import asyncio -import time from functools import partial from grpc.aio import RpcContext @@ -37,6 +37,8 @@ from google.cloud.bigtable.exceptions import _RowSetComplete from google.api_core import retry_async as retries from google.api_core import exceptions as core_exceptions +from google.cloud.bigtable._helpers import _make_metadata +from google.cloud.bigtable._helpers import _attempt_timeout_generator """ This module provides a set of classes for merging ReadRowsResponse chunks @@ -87,16 +89,16 @@ def __init__( self._emit_count = 0 self._request = request self.operation_timeout = operation_timeout - deadline = operation_timeout + time.monotonic() + # use generator to lower per-attempt timeout as we approach operation_timeout deadline + attempt_timeout_gen = _attempt_timeout_generator( + per_request_timeout, operation_timeout + ) row_limit = request.get("rows_limit", 0) - if per_request_timeout is None: - per_request_timeout = operation_timeout # lock in paramters for retryable wrapper self._partial_retryable = partial( self._read_rows_retryable_attempt, client.read_rows, - per_request_timeout, - deadline, + attempt_timeout_gen, row_limit, ) predicate = retries.if_exception_type( @@ -145,8 +147,7 @@ async def aclose(self): async def _read_rows_retryable_attempt( self, gapic_fn: Callable[..., Awaitable[AsyncIterable[ReadRowsResponse]]], - per_request_timeout: float, - operation_deadline: float, + timeout_generator: Iterator[float], total_row_limit: int, ) -> AsyncGenerator[Row, None]: """ @@ -183,16 +184,14 @@ async def _read_rows_retryable_attempt( raise RuntimeError("unexpected state: emit count exceeds row limit") else: self._request["rows_limit"] = new_limit - params_str = f'table_name={self._request.get("table_name", "")}' - app_profile_id = self._request.get("app_profile_id", None) - if app_profile_id: - params_str = f"{params_str},app_profile_id={app_profile_id}" - time_to_deadline = operation_deadline - time.monotonic() - gapic_timeout = max(0, min(time_to_deadline, per_request_timeout)) + metadata = _make_metadata( + self._request.get("table_name", None), + self._request.get("app_profile_id", None), + ) new_gapic_stream: RpcContext = await gapic_fn( self._request, - timeout=gapic_timeout, - metadata=[("x-goog-request-params", params_str)], + timeout=next(timeout_generator), + metadata=metadata, ) try: state_machine = _StateMachine() diff --git a/google/cloud/bigtable/client.py b/google/cloud/bigtable/client.py index e8c7b5e04..3921d6640 100644 --- a/google/cloud/bigtable/client.py +++ b/google/cloud/bigtable/client.py @@ -20,6 +20,8 @@ Any, Optional, Set, + Callable, + Coroutine, TYPE_CHECKING, ) @@ -38,6 +40,8 @@ ) from google.cloud.client import ClientWithProject from google.api_core.exceptions import GoogleAPICallError +from google.api_core import retry_async as retries +from google.api_core import exceptions as core_exceptions from google.cloud.bigtable._read_rows import _ReadRowsOperation import google.auth.credentials @@ -46,9 +50,12 @@ from google.cloud.bigtable.row import Row from google.cloud.bigtable.read_rows_query import ReadRowsQuery from google.cloud.bigtable.iterators import ReadRowsIterator +from google.cloud.bigtable.mutations import Mutation, RowMutationEntry +from google.cloud.bigtable._mutate_rows import _MutateRowsOperation +from google.cloud.bigtable._helpers import _make_metadata +from google.cloud.bigtable._helpers import _convert_retry_deadline if TYPE_CHECKING: - from google.cloud.bigtable.mutations import Mutation, BulkMutationsEntry from google.cloud.bigtable.mutations_batcher import MutationsBatcher from google.cloud.bigtable import RowKeySamples from google.cloud.bigtable.row_filters import RowFilter @@ -586,8 +593,8 @@ async def mutate_row( row_key: str | bytes, mutations: list[Mutation] | Mutation, *, - operation_timeout: int | float | None = 60, - per_request_timeout: int | float | None = None, + operation_timeout: float | None = 60, + per_request_timeout: float | None = None, ): """ Mutates a row atomically. @@ -617,19 +624,75 @@ async def mutate_row( - GoogleAPIError: raised on non-idempotent operations that cannot be safely retried. """ - raise NotImplementedError + operation_timeout = operation_timeout or self.default_operation_timeout + per_request_timeout = per_request_timeout or self.default_per_request_timeout + + if operation_timeout <= 0: + raise ValueError("operation_timeout must be greater than 0") + if per_request_timeout is not None and per_request_timeout <= 0: + raise ValueError("per_request_timeout must be greater than 0") + if per_request_timeout is not None and per_request_timeout > operation_timeout: + raise ValueError("per_request_timeout must be less than operation_timeout") + + if isinstance(row_key, str): + row_key = row_key.encode("utf-8") + request = {"table_name": self.table_name, "row_key": row_key} + if self.app_profile_id: + request["app_profile_id"] = self.app_profile_id + + if isinstance(mutations, Mutation): + mutations = [mutations] + request["mutations"] = [mutation._to_dict() for mutation in mutations] + + if all(mutation.is_idempotent() for mutation in mutations): + # mutations are all idempotent and safe to retry + predicate = retries.if_exception_type( + core_exceptions.DeadlineExceeded, + core_exceptions.ServiceUnavailable, + ) + else: + # mutations should not be retried + predicate = retries.if_exception_type() + + transient_errors = [] + + def on_error_fn(exc): + if predicate(exc): + transient_errors.append(exc) + + retry = retries.AsyncRetry( + predicate=predicate, + on_error=on_error_fn, + timeout=operation_timeout, + initial=0.01, + multiplier=2, + maximum=60, + ) + # wrap rpc in retry logic + retry_wrapped = retry(self.client._gapic_client.mutate_row) + # convert RetryErrors from retry wrapper into DeadlineExceeded errors + deadline_wrapped = _convert_retry_deadline( + retry_wrapped, operation_timeout, transient_errors + ) + metadata = _make_metadata(self.table_name, self.app_profile_id) + # trigger rpc + await deadline_wrapped(request, timeout=per_request_timeout, metadata=metadata) async def bulk_mutate_rows( self, - mutation_entries: list[BulkMutationsEntry], + mutation_entries: list[RowMutationEntry], *, - operation_timeout: int | float | None = 60, - per_request_timeout: int | float | None = None, + operation_timeout: float | None = 60, + per_request_timeout: float | None = None, + on_success: Callable[ + [int, RowMutationEntry], None | Coroutine[None, None, None] + ] + | None = None, ): """ Applies mutations for multiple rows in a single batched request. - Each individual BulkMutationsEntry is applied atomically, but separate entries + Each individual RowMutationEntry is applied atomically, but separate entries may be applied in arbitrary order (even for entries targetting the same row) In total, the row_mutations can contain at most 100000 individual mutations across all entries @@ -650,12 +713,31 @@ async def bulk_mutate_rows( in seconds. If it takes longer than this time to complete, the request will be cancelled with a DeadlineExceeded exception, and a retry will be attempted if within operation_timeout budget - + - on_success: a callback function that will be called when each mutation + entry is confirmed to be applied successfully. Will be passed the + index and the entry itself. Raises: - MutationsExceptionGroup if one or more mutations fails Contains details about any failed entries in .exceptions """ - raise NotImplementedError + operation_timeout = operation_timeout or self.default_operation_timeout + per_request_timeout = per_request_timeout or self.default_per_request_timeout + + if operation_timeout <= 0: + raise ValueError("operation_timeout must be greater than 0") + if per_request_timeout is not None and per_request_timeout <= 0: + raise ValueError("per_request_timeout must be greater than 0") + if per_request_timeout is not None and per_request_timeout > operation_timeout: + raise ValueError("per_request_timeout must be less than operation_timeout") + + operation = _MutateRowsOperation( + self.client._gapic_client, + self, + mutation_entries, + operation_timeout, + per_request_timeout, + ) + await operation.start() async def check_and_mutate_row( self, diff --git a/google/cloud/bigtable/exceptions.py b/google/cloud/bigtable/exceptions.py index 975feb101..fe3bec7e9 100644 --- a/google/cloud/bigtable/exceptions.py +++ b/google/cloud/bigtable/exceptions.py @@ -15,63 +15,15 @@ from __future__ import annotations import sys -from inspect import iscoroutinefunction -from typing import Callable, Any +from typing import TYPE_CHECKING from google.api_core import exceptions as core_exceptions is_311_plus = sys.version_info >= (3, 11) - -def _convert_retry_deadline( - func: Callable[..., Any], - timeout_value: float | None = None, - retry_errors: list[Exception] | None = None, -): - """ - Decorator to convert RetryErrors raised by api_core.retry into - DeadlineExceeded exceptions, indicating that the underlying retries have - exhaused the timeout value. - Optionally attaches a RetryExceptionGroup to the DeadlineExceeded.__cause__, - detailing the failed exceptions associated with each retry. - - Supports both sync and async function wrapping. - - Args: - - func: The function to decorate - - timeout_value: The timeout value to display in the DeadlineExceeded error message - - retry_errors: An optional list of exceptions to attach as a RetryExceptionGroup to the DeadlineExceeded.__cause__ - """ - timeout_str = f" of {timeout_value:.1f}s" if timeout_value is not None else "" - error_str = f"operation_timeout{timeout_str} exceeded" - - def handle_error(): - new_exc = core_exceptions.DeadlineExceeded( - error_str, - ) - source_exc = None - if retry_errors: - source_exc = RetryExceptionGroup( - f"{len(retry_errors)} failed attempts", retry_errors - ) - new_exc.__cause__ = source_exc - raise new_exc from source_exc - - # separate wrappers for async and sync functions - async def wrapper_async(*args, **kwargs): - try: - return await func(*args, **kwargs) - except core_exceptions.RetryError: - handle_error() - - def wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except core_exceptions.RetryError: - handle_error() - - return wrapper_async if iscoroutinefunction(func) else wrapper +if TYPE_CHECKING: + from google.cloud.bigtable.mutations import RowMutationEntry class IdleTimeout(core_exceptions.DeadlineExceeded): @@ -109,9 +61,23 @@ def __init__(self, message, excs): if is_311_plus: super().__init__(message, excs) else: - self.exceptions = excs - revised_message = f"{message} ({len(excs)} sub-exceptions)" - super().__init__(revised_message) + if len(excs) == 0: + raise ValueError("exceptions must be a non-empty sequence") + self.exceptions = tuple(excs) + super().__init__(message) + + def __new__(cls, message, excs): + if is_311_plus: + return super().__new__(cls, message, excs) + else: + return super().__new__(cls) + + def __str__(self): + """ + String representation doesn't display sub-exceptions. Subexceptions are + described in message + """ + return self.args[0] class MutationsExceptionGroup(BigtableExceptionGroup): @@ -119,10 +85,55 @@ class MutationsExceptionGroup(BigtableExceptionGroup): Represents one or more exceptions that occur during a bulk mutation operation """ - pass + @staticmethod + def _format_message(excs: list[FailedMutationEntryError], total_entries: int): + entry_str = "entry" if total_entries == 1 else "entries" + plural_str = "" if len(excs) == 1 else "s" + return f"{len(excs)} sub-exception{plural_str} (from {total_entries} {entry_str} attempted)" + + def __init__(self, excs: list[FailedMutationEntryError], total_entries: int): + super().__init__(self._format_message(excs, total_entries), excs) + + def __new__(cls, excs: list[FailedMutationEntryError], total_entries: int): + return super().__new__(cls, cls._format_message(excs, total_entries), excs) + + +class FailedMutationEntryError(Exception): + """ + Represents a single failed RowMutationEntry in a bulk_mutate_rows request. + A collection of FailedMutationEntryErrors will be raised in a MutationsExceptionGroup + """ + + def __init__( + self, + failed_idx: int, + failed_mutation_entry: "RowMutationEntry", + cause: Exception, + ): + idempotent_msg = ( + "idempotent" if failed_mutation_entry.is_idempotent() else "non-idempotent" + ) + message = f"Failed {idempotent_msg} mutation entry at index {failed_idx} with cause: {cause!r}" + super().__init__(message) + self.index = failed_idx + self.entry = failed_mutation_entry + self.__cause__ = cause class RetryExceptionGroup(BigtableExceptionGroup): """Represents one or more exceptions that occur during a retryable operation""" - pass + @staticmethod + def _format_message(excs: list[Exception]): + if len(excs) == 0: + return "No exceptions" + if len(excs) == 1: + return f"1 failed attempt: {type(excs[0]).__name__}" + else: + return f"{len(excs)} failed attempts. Latest: {type(excs[-1]).__name__}" + + def __init__(self, excs: list[Exception]): + super().__init__(self._format_message(excs), excs) + + def __new__(cls, excs: list[Exception]): + return super().__new__(cls, cls._format_message(excs), excs) diff --git a/google/cloud/bigtable/iterators.py b/google/cloud/bigtable/iterators.py index 169bbc3f3..b20932fb2 100644 --- a/google/cloud/bigtable/iterators.py +++ b/google/cloud/bigtable/iterators.py @@ -22,7 +22,7 @@ from google.cloud.bigtable._read_rows import _ReadRowsOperation from google.cloud.bigtable.exceptions import IdleTimeout -from google.cloud.bigtable.exceptions import _convert_retry_deadline +from google.cloud.bigtable._helpers import _convert_retry_deadline from google.cloud.bigtable.row import Row diff --git a/google/cloud/bigtable/mutations.py b/google/cloud/bigtable/mutations.py index 3bb5b2ed6..c72f132c8 100644 --- a/google/cloud/bigtable/mutations.py +++ b/google/cloud/bigtable/mutations.py @@ -13,41 +13,200 @@ # limitations under the License. # from __future__ import annotations - +from typing import Any +import time from dataclasses import dataclass +from abc import ABC, abstractmethod +# special value for SetCell mutation timestamps. If set, server will assign a timestamp +SERVER_SIDE_TIMESTAMP = -1 -class Mutation: - pass + +class Mutation(ABC): + """Model class for mutations""" + + @abstractmethod + def _to_dict(self) -> dict[str, Any]: + raise NotImplementedError + + def is_idempotent(self) -> bool: + """ + Check if the mutation is idempotent + If false, the mutation will not be retried + """ + return True + + def __str__(self) -> str: + return str(self._to_dict()) + + @classmethod + def _from_dict(cls, input_dict: dict[str, Any]) -> Mutation: + instance: Mutation | None = None + try: + if "set_cell" in input_dict: + details = input_dict["set_cell"] + instance = SetCell( + details["family_name"], + details["column_qualifier"], + details["value"], + details["timestamp_micros"], + ) + elif "delete_from_column" in input_dict: + details = input_dict["delete_from_column"] + time_range = details.get("time_range", {}) + start = time_range.get("start_timestamp_micros", None) + end = time_range.get("end_timestamp_micros", None) + instance = DeleteRangeFromColumn( + details["family_name"], details["column_qualifier"], start, end + ) + elif "delete_from_family" in input_dict: + details = input_dict["delete_from_family"] + instance = DeleteAllFromFamily(details["family_name"]) + elif "delete_from_row" in input_dict: + instance = DeleteAllFromRow() + except KeyError as e: + raise ValueError("Invalid mutation dictionary") from e + if instance is None: + raise ValueError("No valid mutation found") + if not issubclass(instance.__class__, cls): + raise ValueError("Mutation type mismatch") + return instance -@dataclass class SetCell(Mutation): - family: str - column_qualifier: bytes - new_value: bytes | str | int - timestamp_ms: int | None = None + def __init__( + self, + family: str, + qualifier: bytes | str, + new_value: bytes | str | int, + timestamp_micros: int | None = None, + ): + """ + Mutation to set the value of a cell + + Args: + - family: The name of the column family to which the new cell belongs. + - qualifier: The column qualifier of the new cell. + - new_value: The value of the new cell. str or int input will be converted to bytes + - timestamp_micros: The timestamp of the new cell. If None, the current timestamp will be used. + Timestamps will be sent with milisecond-percision. Extra precision will be truncated. + If -1, the server will assign a timestamp. Note that SetCell mutations with server-side + timestamps are non-idempotent operations and will not be retried. + """ + qualifier = qualifier.encode() if isinstance(qualifier, str) else qualifier + if not isinstance(qualifier, bytes): + raise TypeError("qualifier must be bytes or str") + if isinstance(new_value, str): + new_value = new_value.encode() + elif isinstance(new_value, int): + new_value = new_value.to_bytes(8, "big", signed=True) + if not isinstance(new_value, bytes): + raise TypeError("new_value must be bytes, str, or int") + if timestamp_micros is None: + # use current timestamp, with milisecond precision + timestamp_micros = time.time_ns() // 1000 + timestamp_micros = timestamp_micros - (timestamp_micros % 1000) + if timestamp_micros < SERVER_SIDE_TIMESTAMP: + raise ValueError( + "timestamp_micros must be positive (or -1 for server-side timestamp)" + ) + self.family = family + self.qualifier = qualifier + self.new_value = new_value + self.timestamp_micros = timestamp_micros + + def _to_dict(self) -> dict[str, Any]: + """Convert the mutation to a dictionary representation""" + return { + "set_cell": { + "family_name": self.family, + "column_qualifier": self.qualifier, + "timestamp_micros": self.timestamp_micros, + "value": self.new_value, + } + } + + def is_idempotent(self) -> bool: + """Check if the mutation is idempotent""" + return self.timestamp_micros != SERVER_SIDE_TIMESTAMP @dataclass class DeleteRangeFromColumn(Mutation): family: str - column_qualifier: bytes - start_timestamp_ms: int - end_timestamp_ms: int + qualifier: bytes + # None represents 0 + start_timestamp_micros: int | None = None + # None represents infinity + end_timestamp_micros: int | None = None + + def __post_init__(self): + if ( + self.start_timestamp_micros is not None + and self.end_timestamp_micros is not None + and self.start_timestamp_micros > self.end_timestamp_micros + ): + raise ValueError("start_timestamp_micros must be <= end_timestamp_micros") + + def _to_dict(self) -> dict[str, Any]: + timestamp_range = {} + if self.start_timestamp_micros is not None: + timestamp_range["start_timestamp_micros"] = self.start_timestamp_micros + if self.end_timestamp_micros is not None: + timestamp_range["end_timestamp_micros"] = self.end_timestamp_micros + return { + "delete_from_column": { + "family_name": self.family, + "column_qualifier": self.qualifier, + "time_range": timestamp_range, + } + } @dataclass class DeleteAllFromFamily(Mutation): family_to_delete: str + def _to_dict(self) -> dict[str, Any]: + return { + "delete_from_family": { + "family_name": self.family_to_delete, + } + } + @dataclass class DeleteAllFromRow(Mutation): - pass + def _to_dict(self) -> dict[str, Any]: + return { + "delete_from_row": {}, + } -@dataclass -class BulkMutationsEntry: - row: bytes - mutations: list[Mutation] | Mutation +class RowMutationEntry: + def __init__(self, row_key: bytes | str, mutations: Mutation | list[Mutation]): + if isinstance(row_key, str): + row_key = row_key.encode("utf-8") + if isinstance(mutations, Mutation): + mutations = [mutations] + self.row_key = row_key + self.mutations = tuple(mutations) + + def _to_dict(self) -> dict[str, Any]: + return { + "row_key": self.row_key, + "mutations": [mutation._to_dict() for mutation in self.mutations], + } + + def is_idempotent(self) -> bool: + """Check if the mutation is idempotent""" + return all(mutation.is_idempotent() for mutation in self.mutations) + + @classmethod + def _from_dict(cls, input_dict: dict[str, Any]) -> RowMutationEntry: + return RowMutationEntry( + row_key=input_dict["row_key"], + mutations=[ + Mutation._from_dict(mutation) for mutation in input_dict["mutations"] + ], + ) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 48caceccd..7d015224c 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -187,6 +187,19 @@ async def delete_rows(self): await self.table.client._gapic_client.mutate_rows(request) +async def _retrieve_cell_value(table, row_key): + """ + Helper to read an individual row + """ + from google.cloud.bigtable import ReadRowsQuery + + row_list = await table.read_rows(ReadRowsQuery(row_keys=row_key)) + assert len(row_list) == 1 + row = row_list[0] + cell = row.cells[0] + return cell.value + + @pytest_asyncio.fixture(scope="function") async def temp_rows(table): builder = TempRowBuilder(table) @@ -205,6 +218,66 @@ async def test_ping_and_warm_gapic(client, table): await client._gapic_client.ping_and_warm(request) +@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@pytest.mark.asyncio +async def test_mutation_set_cell(table, temp_rows): + """ + Ensure cells can be set properly + """ + from google.cloud.bigtable.mutations import SetCell + + row_key = b"mutate" + family = TEST_FAMILY + qualifier = b"test-qualifier" + start_value = b"start" + await temp_rows.add_row( + row_key, family=family, qualifier=qualifier, value=start_value + ) + + # ensure cell is initialized + assert (await _retrieve_cell_value(table, row_key)) == start_value + + expected_value = b"new-value" + mutation = SetCell( + family=TEST_FAMILY, qualifier=b"test-qualifier", new_value=expected_value + ) + + await table.mutate_row(row_key, mutation) + + # ensure cell is updated + assert (await _retrieve_cell_value(table, row_key)) == expected_value + + +@retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) +@pytest.mark.asyncio +async def test_bulk_mutations_set_cell(client, table, temp_rows): + """ + Ensure cells can be set properly + """ + from google.cloud.bigtable.mutations import SetCell, RowMutationEntry + + row_key = b"bulk_mutate" + family = TEST_FAMILY + qualifier = b"test-qualifier" + start_value = b"start" + await temp_rows.add_row( + row_key, family=family, qualifier=qualifier, value=start_value + ) + + # ensure cell is initialized + assert (await _retrieve_cell_value(table, row_key)) == start_value + + expected_value = b"new-value" + mutation = SetCell( + family=TEST_FAMILY, qualifier=b"test-qualifier", new_value=expected_value + ) + bulk_mutation = RowMutationEntry(row_key, [mutation]) + await table.bulk_mutate_rows([bulk_mutation]) + + # ensure cell is updated + assert (await _retrieve_cell_value(table, row_key)) == expected_value + + @retry.Retry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) @pytest.mark.asyncio async def test_read_rows_stream(table, temp_rows): diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py new file mode 100644 index 000000000..2765afe24 --- /dev/null +++ b/tests/unit/test__helpers.py @@ -0,0 +1,145 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest +import google.cloud.bigtable._helpers as _helpers +import google.cloud.bigtable.exceptions as bigtable_exceptions + +import mock + + +class TestMakeMetadata: + @pytest.mark.parametrize( + "table,profile,expected", + [ + ("table", "profile", "table_name=table,app_profile_id=profile"), + ("table", None, "table_name=table"), + ], + ) + def test__make_metadata(self, table, profile, expected): + metadata = _helpers._make_metadata(table, profile) + assert metadata == [("x-goog-request-params", expected)] + + +class TestAttemptTimeoutGenerator: + @pytest.mark.parametrize( + "request_t,operation_t,expected_list", + [ + (1, 3.5, [1, 1, 1, 0.5, 0, 0]), + (None, 3.5, [3.5, 2.5, 1.5, 0.5, 0, 0]), + (10, 5, [5, 4, 3, 2, 1, 0, 0]), + (3, 3, [3, 2, 1, 0, 0, 0, 0]), + (0, 3, [0, 0, 0]), + (3, 0, [0, 0, 0]), + (-1, 3, [0, 0, 0]), + (3, -1, [0, 0, 0]), + ], + ) + def test_attempt_timeout_generator(self, request_t, operation_t, expected_list): + """ + test different values for timeouts. Clock is incremented by 1 second for each item in expected_list + """ + timestamp_start = 123 + with mock.patch("time.monotonic") as mock_monotonic: + mock_monotonic.return_value = timestamp_start + generator = _helpers._attempt_timeout_generator(request_t, operation_t) + for val in expected_list: + mock_monotonic.return_value += 1 + assert next(generator) == val + + @pytest.mark.parametrize( + "request_t,operation_t,expected", + [ + (1, 3.5, 1), + (None, 3.5, 3.5), + (10, 5, 5), + (5, 10, 5), + (3, 3, 3), + (0, 3, 0), + (3, 0, 0), + (-1, 3, 0), + (3, -1, 0), + ], + ) + def test_attempt_timeout_frozen_time(self, request_t, operation_t, expected): + """test with time.monotonic frozen""" + timestamp_start = 123 + with mock.patch("time.monotonic") as mock_monotonic: + mock_monotonic.return_value = timestamp_start + generator = _helpers._attempt_timeout_generator(request_t, operation_t) + assert next(generator) == expected + # value should not change without time.monotonic changing + assert next(generator) == expected + + def test_attempt_timeout_w_sleeps(self): + """use real sleep values to make sure it matches expectations""" + from time import sleep + + operation_timeout = 1 + generator = _helpers._attempt_timeout_generator(None, operation_timeout) + expected_value = operation_timeout + sleep_time = 0.1 + for i in range(3): + found_value = next(generator) + assert abs(found_value - expected_value) < 0.001 + sleep(sleep_time) + expected_value -= sleep_time + + +class TestConvertRetryDeadline: + """ + Test _convert_retry_deadline wrapper + """ + + @pytest.mark.asyncio + async def test_no_error(self): + async def test_func(): + return 1 + + wrapped = _helpers._convert_retry_deadline(test_func, 0.1) + assert await wrapped() == 1 + + @pytest.mark.asyncio + @pytest.mark.parametrize("timeout", [0.1, 2.0, 30.0]) + async def test_retry_error(self, timeout): + from google.api_core.exceptions import RetryError, DeadlineExceeded + + async def test_func(): + raise RetryError("retry error", None) + + wrapped = _helpers._convert_retry_deadline(test_func, timeout) + with pytest.raises(DeadlineExceeded) as e: + await wrapped() + assert e.value.__cause__ is None + assert f"operation_timeout of {timeout}s exceeded" in str(e.value) + + @pytest.mark.asyncio + async def test_with_retry_errors(self): + from google.api_core.exceptions import RetryError, DeadlineExceeded + + timeout = 10.0 + + async def test_func(): + raise RetryError("retry error", None) + + associated_errors = [RuntimeError("error1"), ZeroDivisionError("other")] + wrapped = _helpers._convert_retry_deadline( + test_func, timeout, associated_errors + ) + with pytest.raises(DeadlineExceeded) as e: + await wrapped() + cause = e.value.__cause__ + assert isinstance(cause, bigtable_exceptions.RetryExceptionGroup) + assert cause.exceptions == tuple(associated_errors) + assert f"operation_timeout of {timeout}s exceeded" in str(e.value) diff --git a/tests/unit/test__mutate_rows.py b/tests/unit/test__mutate_rows.py new file mode 100644 index 000000000..4fba16f23 --- /dev/null +++ b/tests/unit/test__mutate_rows.py @@ -0,0 +1,290 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from google.cloud.bigtable_v2.types import MutateRowsResponse +from google.rpc import status_pb2 +import google.api_core.exceptions as core_exceptions + +# try/except added for compatibility with python < 3.8 +try: + from unittest import mock + from unittest.mock import AsyncMock # type: ignore +except ImportError: # pragma: NO COVER + import mock # type: ignore + from mock import AsyncMock # type: ignore + + +class TestMutateRowsOperation: + def _target_class(self): + from google.cloud.bigtable._mutate_rows import _MutateRowsOperation + + return _MutateRowsOperation + + def _make_one(self, *args, **kwargs): + if not args: + kwargs["gapic_client"] = kwargs.pop("gapic_client", mock.Mock()) + kwargs["table"] = kwargs.pop("table", AsyncMock()) + kwargs["mutation_entries"] = kwargs.pop("mutation_entries", []) + kwargs["operation_timeout"] = kwargs.pop("operation_timeout", 5) + kwargs["per_request_timeout"] = kwargs.pop("per_request_timeout", 0.1) + return self._target_class()(*args, **kwargs) + + async def _mock_stream(self, mutation_list, error_dict): + for idx, entry in enumerate(mutation_list): + code = error_dict.get(idx, 0) + yield MutateRowsResponse( + entries=[ + MutateRowsResponse.Entry( + index=idx, status=status_pb2.Status(code=code) + ) + ] + ) + + def _make_mock_gapic(self, mutation_list, error_dict=None): + mock_fn = AsyncMock() + if error_dict is None: + error_dict = {} + mock_fn.side_effect = lambda *args, **kwargs: self._mock_stream( + mutation_list, error_dict + ) + return mock_fn + + def test_ctor(self): + """ + test that constructor sets all the attributes correctly + """ + from google.cloud.bigtable._mutate_rows import _MutateRowsIncomplete + from google.api_core.exceptions import DeadlineExceeded + from google.api_core.exceptions import ServiceUnavailable + + client = mock.Mock() + table = mock.Mock() + entries = [mock.Mock(), mock.Mock()] + operation_timeout = 0.05 + attempt_timeout = 0.01 + instance = self._make_one( + client, table, entries, operation_timeout, attempt_timeout + ) + # running gapic_fn should trigger a client call + assert client.mutate_rows.call_count == 0 + instance._gapic_fn() + assert client.mutate_rows.call_count == 1 + # gapic_fn should call with table details + inner_kwargs = client.mutate_rows.call_args[1] + assert len(inner_kwargs) == 3 + assert inner_kwargs["table_name"] == table.table_name + assert inner_kwargs["app_profile_id"] == table.app_profile_id + metadata = inner_kwargs["metadata"] + assert len(metadata) == 1 + assert metadata[0][0] == "x-goog-request-params" + assert str(table.table_name) in metadata[0][1] + assert str(table.app_profile_id) in metadata[0][1] + # entries should be passed down + assert instance.mutations == entries + # timeout_gen should generate per-attempt timeout + assert next(instance.timeout_generator) == attempt_timeout + # ensure predicate is set + assert instance.is_retryable is not None + assert instance.is_retryable(DeadlineExceeded("")) is True + assert instance.is_retryable(ServiceUnavailable("")) is True + assert instance.is_retryable(_MutateRowsIncomplete("")) is True + assert instance.is_retryable(RuntimeError("")) is False + assert instance.remaining_indices == list(range(len(entries))) + assert instance.errors == {} + + @pytest.mark.asyncio + async def test_mutate_rows_operation(self): + """ + Test successful case of mutate_rows_operation + """ + client = mock.Mock() + table = mock.Mock() + entries = [mock.Mock(), mock.Mock()] + operation_timeout = 0.05 + instance = self._make_one( + client, table, entries, operation_timeout, operation_timeout + ) + with mock.patch.object(instance, "_operation", AsyncMock()) as attempt_mock: + attempt_mock.return_value = None + await instance.start() + assert attempt_mock.call_count == 1 + + @pytest.mark.parametrize( + "exc_type", [RuntimeError, ZeroDivisionError, core_exceptions.Forbidden] + ) + @pytest.mark.asyncio + async def test_mutate_rows_exception(self, exc_type): + """ + exceptions raised from retryable should be raised in MutationsExceptionGroup + """ + from google.cloud.bigtable.exceptions import MutationsExceptionGroup + from google.cloud.bigtable.exceptions import FailedMutationEntryError + + client = mock.Mock() + table = mock.Mock() + entries = [mock.Mock()] + operation_timeout = 0.05 + expected_cause = exc_type("abort") + with mock.patch.object( + self._target_class(), + "_run_attempt", + AsyncMock(), + ) as attempt_mock: + attempt_mock.side_effect = expected_cause + found_exc = None + try: + instance = self._make_one( + client, table, entries, operation_timeout, operation_timeout + ) + await instance.start() + except MutationsExceptionGroup as e: + found_exc = e + assert attempt_mock.call_count == 1 + assert len(found_exc.exceptions) == 1 + assert isinstance(found_exc.exceptions[0], FailedMutationEntryError) + assert found_exc.exceptions[0].__cause__ == expected_cause + + @pytest.mark.parametrize( + "exc_type", + [core_exceptions.DeadlineExceeded, core_exceptions.ServiceUnavailable], + ) + @pytest.mark.asyncio + async def test_mutate_rows_exception_retryable_eventually_pass(self, exc_type): + """ + If an exception fails but eventually passes, it should not raise an exception + """ + from google.cloud.bigtable._mutate_rows import _MutateRowsOperation + + client = mock.Mock() + table = mock.Mock() + entries = [mock.Mock()] + operation_timeout = 1 + expected_cause = exc_type("retry") + num_retries = 2 + with mock.patch.object( + _MutateRowsOperation, + "_run_attempt", + AsyncMock(), + ) as attempt_mock: + attempt_mock.side_effect = [expected_cause] * num_retries + [None] + instance = self._make_one( + client, table, entries, operation_timeout, operation_timeout + ) + await instance.start() + assert attempt_mock.call_count == num_retries + 1 + + @pytest.mark.asyncio + async def test_mutate_rows_incomplete_ignored(self): + """ + MutateRowsIncomplete exceptions should not be added to error list + """ + from google.cloud.bigtable._mutate_rows import _MutateRowsIncomplete + from google.cloud.bigtable.exceptions import MutationsExceptionGroup + from google.api_core.exceptions import DeadlineExceeded + + client = mock.Mock() + table = mock.Mock() + entries = [mock.Mock()] + operation_timeout = 0.05 + with mock.patch.object( + self._target_class(), + "_run_attempt", + AsyncMock(), + ) as attempt_mock: + attempt_mock.side_effect = _MutateRowsIncomplete("ignored") + found_exc = None + try: + instance = self._make_one( + client, table, entries, operation_timeout, operation_timeout + ) + await instance.start() + except MutationsExceptionGroup as e: + found_exc = e + assert attempt_mock.call_count > 0 + assert len(found_exc.exceptions) == 1 + assert isinstance(found_exc.exceptions[0].__cause__, DeadlineExceeded) + + @pytest.mark.asyncio + async def test_run_attempt_single_entry_success(self): + """Test mutating a single entry""" + mutation = mock.Mock() + mutations = {0: mutation} + expected_timeout = 1.3 + mock_gapic_fn = self._make_mock_gapic(mutations) + instance = self._make_one( + mutation_entries=mutations, + per_request_timeout=expected_timeout, + ) + with mock.patch.object(instance, "_gapic_fn", mock_gapic_fn): + await instance._run_attempt() + assert len(instance.remaining_indices) == 0 + assert mock_gapic_fn.call_count == 1 + _, kwargs = mock_gapic_fn.call_args + assert kwargs["timeout"] == expected_timeout + assert kwargs["entries"] == [mutation._to_dict()] + + @pytest.mark.asyncio + async def test_run_attempt_empty_request(self): + """Calling with no mutations should result in no API calls""" + mock_gapic_fn = self._make_mock_gapic([]) + instance = self._make_one( + mutation_entries=[], + ) + await instance._run_attempt() + assert mock_gapic_fn.call_count == 0 + + @pytest.mark.asyncio + async def test_run_attempt_partial_success_retryable(self): + """Some entries succeed, but one fails. Should report the proper index, and raise incomplete exception""" + from google.cloud.bigtable._mutate_rows import _MutateRowsIncomplete + + success_mutation = mock.Mock() + success_mutation_2 = mock.Mock() + failure_mutation = mock.Mock() + mutations = [success_mutation, failure_mutation, success_mutation_2] + mock_gapic_fn = self._make_mock_gapic(mutations, error_dict={1: 300}) + instance = self._make_one( + mutation_entries=mutations, + ) + instance.is_retryable = lambda x: True + with mock.patch.object(instance, "_gapic_fn", mock_gapic_fn): + with pytest.raises(_MutateRowsIncomplete): + await instance._run_attempt() + assert instance.remaining_indices == [1] + assert 0 not in instance.errors + assert len(instance.errors[1]) == 1 + assert instance.errors[1][0].grpc_status_code == 300 + assert 2 not in instance.errors + + @pytest.mark.asyncio + async def test_run_attempt_partial_success_non_retryable(self): + """Some entries succeed, but one fails. Exception marked as non-retryable. Do not raise incomplete error""" + success_mutation = mock.Mock() + success_mutation_2 = mock.Mock() + failure_mutation = mock.Mock() + mutations = [success_mutation, failure_mutation, success_mutation_2] + mock_gapic_fn = self._make_mock_gapic(mutations, error_dict={1: 300}) + instance = self._make_one( + mutation_entries=mutations, + ) + instance.is_retryable = lambda x: False + with mock.patch.object(instance, "_gapic_fn", mock_gapic_fn): + await instance._run_attempt() + assert instance.remaining_indices == [] + assert 0 not in instance.errors + assert len(instance.errors[1]) == 1 + assert instance.errors[1][0].grpc_status_code == 300 + assert 2 not in instance.errors diff --git a/tests/unit/test__read_rows.py b/tests/unit/test__read_rows.py index e57b5d992..c893c56cd 100644 --- a/tests/unit/test__read_rows.py +++ b/tests/unit/test__read_rows.py @@ -41,10 +41,14 @@ def test_ctor_defaults(self): client = mock.Mock() client.read_rows = mock.Mock() client.read_rows.return_value = None - start_time = 123 default_operation_timeout = 600 - with mock.patch("time.monotonic", return_value=start_time): + time_gen_mock = mock.Mock() + with mock.patch( + "google.cloud.bigtable._read_rows._attempt_timeout_generator", time_gen_mock + ): instance = self._make_one(request, client) + assert time_gen_mock.call_count == 1 + time_gen_mock.assert_called_once_with(None, default_operation_timeout) assert instance.transient_errors == [] assert instance._last_emitted_row_key is None assert instance._emit_count == 0 @@ -52,9 +56,8 @@ def test_ctor_defaults(self): retryable_fn = instance._partial_retryable assert retryable_fn.func == instance._read_rows_retryable_attempt assert retryable_fn.args[0] == client.read_rows - assert retryable_fn.args[1] == default_operation_timeout - assert retryable_fn.args[2] == default_operation_timeout + start_time - assert retryable_fn.args[3] == 0 + assert retryable_fn.args[1] == time_gen_mock.return_value + assert retryable_fn.args[2] == 0 assert client.read_rows.call_count == 0 def test_ctor(self): @@ -65,14 +68,20 @@ def test_ctor(self): client.read_rows.return_value = None expected_operation_timeout = 42 expected_request_timeout = 44 - start_time = 123 - with mock.patch("time.monotonic", return_value=start_time): + time_gen_mock = mock.Mock() + with mock.patch( + "google.cloud.bigtable._read_rows._attempt_timeout_generator", time_gen_mock + ): instance = self._make_one( request, client, operation_timeout=expected_operation_timeout, per_request_timeout=expected_request_timeout, ) + assert time_gen_mock.call_count == 1 + time_gen_mock.assert_called_once_with( + expected_request_timeout, expected_operation_timeout + ) assert instance.transient_errors == [] assert instance._last_emitted_row_key is None assert instance._emit_count == 0 @@ -80,9 +89,8 @@ def test_ctor(self): retryable_fn = instance._partial_retryable assert retryable_fn.func == instance._read_rows_retryable_attempt assert retryable_fn.args[0] == client.read_rows - assert retryable_fn.args[1] == expected_request_timeout - assert retryable_fn.args[2] == start_time + expected_operation_timeout - assert retryable_fn.args[3] == row_limit + assert retryable_fn.args[1] == time_gen_mock.return_value + assert retryable_fn.args[2] == row_limit assert client.read_rows.call_count == 0 def test___aiter__(self): @@ -217,14 +225,18 @@ async def test_revise_limit(self, start_limit, emit_num, expected_limit): - if the number emitted exceeds the new limit, an exception should should be raised (tested in test_revise_limit_over_limit) """ + import itertools + request = {"rows_limit": start_limit} instance = self._make_one(request, mock.Mock()) instance._emit_count = emit_num instance._last_emitted_row_key = "a" gapic_mock = mock.Mock() gapic_mock.side_effect = [GeneratorExit("stop_fn")] + mock_timeout_gen = itertools.repeat(5) + attempt = instance._read_rows_retryable_attempt( - gapic_mock, 100, 100, start_limit + gapic_mock, mock_timeout_gen, start_limit ) if start_limit != 0 and expected_limit == 0: # if we emitted the expected number of rows, we should receive a StopAsyncIteration @@ -242,12 +254,15 @@ async def test_revise_limit_over_limit(self, start_limit, emit_num): Should raise runtime error if we get in state where emit_num > start_num (unless start_num == 0, which represents unlimited) """ + import itertools + request = {"rows_limit": start_limit} instance = self._make_one(request, mock.Mock()) instance._emit_count = emit_num instance._last_emitted_row_key = "a" + mock_timeout_gen = itertools.repeat(5) attempt = instance._read_rows_retryable_attempt( - mock.Mock(), 100, 100, start_limit + mock.Mock(), mock_timeout_gen, start_limit ) with pytest.raises(RuntimeError) as e: await attempt.__anext__() @@ -273,6 +288,7 @@ async def test_retryable_attempt_hit_limit(self, limit): Stream should end after hitting the limit """ from google.cloud.bigtable_v2.types.bigtable import ReadRowsResponse + import itertools instance = self._make_one({}, mock.Mock()) @@ -290,7 +306,8 @@ async def gen(): return gen() - gen = instance._read_rows_retryable_attempt(mock_gapic, 100, 100, limit) + mock_timeout_gen = itertools.repeat(5) + gen = instance._read_rows_retryable_attempt(mock_gapic, mock_timeout_gen, limit) # should yield values up to the limit for i in range(limit): await gen.__anext__() diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 8a3402a65..be3703a23 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -20,6 +20,7 @@ import pytest +from google.cloud.bigtable import mutations from google.auth.credentials import AnonymousCredentials from google.cloud.bigtable_v2.types import ReadRowsResponse from google.cloud.bigtable.read_rows_query import ReadRowsQuery @@ -1294,3 +1295,579 @@ async def test_read_rows_default_timeout_override(self): kwargs = mock_op.call_args_list[0].kwargs assert kwargs["operation_timeout"] == operation_timeout assert kwargs["per_request_timeout"] == per_request_timeout + + @pytest.mark.parametrize("include_app_profile", [True, False]) + @pytest.mark.asyncio + async def test_read_rows_metadata(self, include_app_profile): + """request should attach metadata headers""" + profile = "profile" if include_app_profile else None + async with self._make_client() as client: + async with client.get_table("i", "t", app_profile_id=profile) as table: + with mock.patch.object( + client._gapic_client, "read_rows", AsyncMock() + ) as read_rows: + await table.read_rows(ReadRowsQuery()) + kwargs = read_rows.call_args_list[0].kwargs + metadata = kwargs["metadata"] + goog_metadata = None + for key, value in metadata: + if key == "x-goog-request-params": + goog_metadata = value + assert goog_metadata is not None, "x-goog-request-params not found" + assert "table_name=" + table.table_name in goog_metadata + if include_app_profile: + assert "app_profile_id=profile" in goog_metadata + else: + assert "app_profile_id=" not in goog_metadata + + +class TestMutateRow: + def _make_client(self, *args, **kwargs): + from google.cloud.bigtable.client import BigtableDataClient + + return BigtableDataClient(*args, **kwargs) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mutation_arg", + [ + mutations.SetCell("family", b"qualifier", b"value"), + mutations.SetCell( + "family", b"qualifier", b"value", timestamp_micros=1234567890 + ), + mutations.DeleteRangeFromColumn("family", b"qualifier"), + mutations.DeleteAllFromFamily("family"), + mutations.DeleteAllFromRow(), + [mutations.SetCell("family", b"qualifier", b"value")], + [ + mutations.DeleteRangeFromColumn("family", b"qualifier"), + mutations.DeleteAllFromRow(), + ], + ], + ) + async def test_mutate_row(self, mutation_arg): + """Test mutations with no errors""" + expected_per_request_timeout = 19 + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_row" + ) as mock_gapic: + mock_gapic.return_value = None + await table.mutate_row( + "row_key", + mutation_arg, + per_request_timeout=expected_per_request_timeout, + ) + assert mock_gapic.call_count == 1 + request = mock_gapic.call_args[0][0] + assert ( + request["table_name"] + == "projects/project/instances/instance/tables/table" + ) + assert request["row_key"] == b"row_key" + formatted_mutations = ( + [mutation._to_dict() for mutation in mutation_arg] + if isinstance(mutation_arg, list) + else [mutation_arg._to_dict()] + ) + assert request["mutations"] == formatted_mutations + found_per_request_timeout = mock_gapic.call_args[1]["timeout"] + assert found_per_request_timeout == expected_per_request_timeout + + @pytest.mark.parametrize( + "retryable_exception", + [ + core_exceptions.DeadlineExceeded, + core_exceptions.ServiceUnavailable, + ], + ) + @pytest.mark.asyncio + async def test_mutate_row_retryable_errors(self, retryable_exception): + from google.api_core.exceptions import DeadlineExceeded + from google.cloud.bigtable.exceptions import RetryExceptionGroup + + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_row" + ) as mock_gapic: + mock_gapic.side_effect = retryable_exception("mock") + with pytest.raises(DeadlineExceeded) as e: + mutation = mutations.DeleteAllFromRow() + assert mutation.is_idempotent() is True + await table.mutate_row( + "row_key", mutation, operation_timeout=0.05 + ) + cause = e.value.__cause__ + assert isinstance(cause, RetryExceptionGroup) + assert isinstance(cause.exceptions[0], retryable_exception) + + @pytest.mark.parametrize( + "retryable_exception", + [ + core_exceptions.DeadlineExceeded, + core_exceptions.ServiceUnavailable, + ], + ) + @pytest.mark.asyncio + async def test_mutate_row_non_idempotent_retryable_errors( + self, retryable_exception + ): + """ + Non-idempotent mutations should not be retried + """ + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_row" + ) as mock_gapic: + mock_gapic.side_effect = retryable_exception("mock") + with pytest.raises(retryable_exception): + mutation = mutations.SetCell( + "family", b"qualifier", b"value", -1 + ) + assert mutation.is_idempotent() is False + await table.mutate_row( + "row_key", mutation, operation_timeout=0.2 + ) + + @pytest.mark.parametrize( + "non_retryable_exception", + [ + core_exceptions.OutOfRange, + core_exceptions.NotFound, + core_exceptions.FailedPrecondition, + RuntimeError, + ValueError, + core_exceptions.Aborted, + ], + ) + @pytest.mark.asyncio + async def test_mutate_row_non_retryable_errors(self, non_retryable_exception): + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_row" + ) as mock_gapic: + mock_gapic.side_effect = non_retryable_exception("mock") + with pytest.raises(non_retryable_exception): + mutation = mutations.SetCell( + "family", + b"qualifier", + b"value", + timestamp_micros=1234567890, + ) + assert mutation.is_idempotent() is True + await table.mutate_row( + "row_key", mutation, operation_timeout=0.2 + ) + + @pytest.mark.parametrize("include_app_profile", [True, False]) + @pytest.mark.asyncio + async def test_mutate_row_metadata(self, include_app_profile): + """request should attach metadata headers""" + profile = "profile" if include_app_profile else None + async with self._make_client() as client: + async with client.get_table("i", "t", app_profile_id=profile) as table: + with mock.patch.object( + client._gapic_client, "mutate_row", AsyncMock() + ) as read_rows: + await table.mutate_row("rk", {}) + kwargs = read_rows.call_args_list[0].kwargs + metadata = kwargs["metadata"] + goog_metadata = None + for key, value in metadata: + if key == "x-goog-request-params": + goog_metadata = value + assert goog_metadata is not None, "x-goog-request-params not found" + assert "table_name=" + table.table_name in goog_metadata + if include_app_profile: + assert "app_profile_id=profile" in goog_metadata + else: + assert "app_profile_id=" not in goog_metadata + + +class TestBulkMutateRows: + def _make_client(self, *args, **kwargs): + from google.cloud.bigtable.client import BigtableDataClient + + return BigtableDataClient(*args, **kwargs) + + async def _mock_response(self, response_list): + from google.cloud.bigtable_v2.types import MutateRowsResponse + from google.rpc import status_pb2 + + statuses = [] + for response in response_list: + if isinstance(response, core_exceptions.GoogleAPICallError): + statuses.append( + status_pb2.Status( + message=str(response), code=response.grpc_status_code.value[0] + ) + ) + else: + statuses.append(status_pb2.Status(code=0)) + entries = [ + MutateRowsResponse.Entry(index=i, status=statuses[i]) + for i in range(len(response_list)) + ] + + async def generator(): + yield MutateRowsResponse(entries=entries) + + return generator() + + @pytest.mark.asyncio + @pytest.mark.asyncio + @pytest.mark.parametrize( + "mutation_arg", + [ + [mutations.SetCell("family", b"qualifier", b"value")], + [ + mutations.SetCell( + "family", b"qualifier", b"value", timestamp_micros=1234567890 + ) + ], + [mutations.DeleteRangeFromColumn("family", b"qualifier")], + [mutations.DeleteAllFromFamily("family")], + [mutations.DeleteAllFromRow()], + [mutations.SetCell("family", b"qualifier", b"value")], + [ + mutations.DeleteRangeFromColumn("family", b"qualifier"), + mutations.DeleteAllFromRow(), + ], + ], + ) + async def test_bulk_mutate_rows(self, mutation_arg): + """Test mutations with no errors""" + expected_per_request_timeout = 19 + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_rows" + ) as mock_gapic: + mock_gapic.return_value = self._mock_response([None]) + bulk_mutation = mutations.RowMutationEntry(b"row_key", mutation_arg) + await table.bulk_mutate_rows( + [bulk_mutation], + per_request_timeout=expected_per_request_timeout, + ) + assert mock_gapic.call_count == 1 + kwargs = mock_gapic.call_args[1] + assert ( + kwargs["table_name"] + == "projects/project/instances/instance/tables/table" + ) + assert kwargs["entries"] == [bulk_mutation._to_dict()] + assert kwargs["timeout"] == expected_per_request_timeout + + @pytest.mark.asyncio + async def test_bulk_mutate_rows_multiple_entries(self): + """Test mutations with no errors""" + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_rows" + ) as mock_gapic: + mock_gapic.return_value = self._mock_response([None, None]) + mutation_list = [mutations.DeleteAllFromRow()] + entry_1 = mutations.RowMutationEntry(b"row_key_1", mutation_list) + entry_2 = mutations.RowMutationEntry(b"row_key_2", mutation_list) + await table.bulk_mutate_rows( + [entry_1, entry_2], + ) + assert mock_gapic.call_count == 1 + kwargs = mock_gapic.call_args[1] + assert ( + kwargs["table_name"] + == "projects/project/instances/instance/tables/table" + ) + assert kwargs["entries"][0] == entry_1._to_dict() + assert kwargs["entries"][1] == entry_2._to_dict() + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "exception", + [ + core_exceptions.DeadlineExceeded, + core_exceptions.ServiceUnavailable, + ], + ) + async def test_bulk_mutate_rows_idempotent_mutation_error_retryable( + self, exception + ): + """ + Individual idempotent mutations should be retried if they fail with a retryable error + """ + from google.cloud.bigtable.exceptions import ( + RetryExceptionGroup, + FailedMutationEntryError, + MutationsExceptionGroup, + ) + + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_rows" + ) as mock_gapic: + mock_gapic.side_effect = lambda *a, **k: self._mock_response( + [exception("mock")] + ) + with pytest.raises(MutationsExceptionGroup) as e: + mutation = mutations.DeleteAllFromRow() + entry = mutations.RowMutationEntry(b"row_key", [mutation]) + assert mutation.is_idempotent() is True + await table.bulk_mutate_rows([entry], operation_timeout=0.05) + assert len(e.value.exceptions) == 1 + failed_exception = e.value.exceptions[0] + assert "non-idempotent" not in str(failed_exception) + assert isinstance(failed_exception, FailedMutationEntryError) + cause = failed_exception.__cause__ + assert isinstance(cause, RetryExceptionGroup) + assert isinstance(cause.exceptions[0], exception) + # last exception should be due to retry timeout + assert isinstance( + cause.exceptions[-1], core_exceptions.DeadlineExceeded + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "exception", + [ + core_exceptions.OutOfRange, + core_exceptions.NotFound, + core_exceptions.FailedPrecondition, + core_exceptions.Aborted, + ], + ) + async def test_bulk_mutate_rows_idempotent_mutation_error_non_retryable( + self, exception + ): + """ + Individual idempotent mutations should not be retried if they fail with a non-retryable error + """ + from google.cloud.bigtable.exceptions import ( + FailedMutationEntryError, + MutationsExceptionGroup, + ) + + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_rows" + ) as mock_gapic: + mock_gapic.side_effect = lambda *a, **k: self._mock_response( + [exception("mock")] + ) + with pytest.raises(MutationsExceptionGroup) as e: + mutation = mutations.DeleteAllFromRow() + entry = mutations.RowMutationEntry(b"row_key", [mutation]) + assert mutation.is_idempotent() is True + await table.bulk_mutate_rows([entry], operation_timeout=0.05) + assert len(e.value.exceptions) == 1 + failed_exception = e.value.exceptions[0] + assert "non-idempotent" not in str(failed_exception) + assert isinstance(failed_exception, FailedMutationEntryError) + cause = failed_exception.__cause__ + assert isinstance(cause, exception) + + @pytest.mark.parametrize( + "retryable_exception", + [ + core_exceptions.DeadlineExceeded, + core_exceptions.ServiceUnavailable, + ], + ) + @pytest.mark.asyncio + async def test_bulk_mutate_idempotent_retryable_request_errors( + self, retryable_exception + ): + """ + Individual idempotent mutations should be retried if the request fails with a retryable error + """ + from google.cloud.bigtable.exceptions import ( + RetryExceptionGroup, + FailedMutationEntryError, + MutationsExceptionGroup, + ) + + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_rows" + ) as mock_gapic: + mock_gapic.side_effect = retryable_exception("mock") + with pytest.raises(MutationsExceptionGroup) as e: + mutation = mutations.SetCell( + "family", b"qualifier", b"value", timestamp_micros=123 + ) + entry = mutations.RowMutationEntry(b"row_key", [mutation]) + assert mutation.is_idempotent() is True + await table.bulk_mutate_rows([entry], operation_timeout=0.05) + assert len(e.value.exceptions) == 1 + failed_exception = e.value.exceptions[0] + assert isinstance(failed_exception, FailedMutationEntryError) + assert "non-idempotent" not in str(failed_exception) + cause = failed_exception.__cause__ + assert isinstance(cause, RetryExceptionGroup) + assert isinstance(cause.exceptions[0], retryable_exception) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "retryable_exception", + [ + core_exceptions.DeadlineExceeded, + core_exceptions.ServiceUnavailable, + ], + ) + async def test_bulk_mutate_rows_non_idempotent_retryable_errors( + self, retryable_exception + ): + """Non-Idempotent mutations should never be retried""" + from google.cloud.bigtable.exceptions import ( + FailedMutationEntryError, + MutationsExceptionGroup, + ) + + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_rows" + ) as mock_gapic: + mock_gapic.side_effect = lambda *a, **k: self._mock_response( + [retryable_exception("mock")] + ) + with pytest.raises(MutationsExceptionGroup) as e: + mutation = mutations.SetCell( + "family", b"qualifier", b"value", -1 + ) + entry = mutations.RowMutationEntry(b"row_key", [mutation]) + assert mutation.is_idempotent() is False + await table.bulk_mutate_rows([entry], operation_timeout=0.2) + assert len(e.value.exceptions) == 1 + failed_exception = e.value.exceptions[0] + assert isinstance(failed_exception, FailedMutationEntryError) + assert "non-idempotent" in str(failed_exception) + cause = failed_exception.__cause__ + assert isinstance(cause, retryable_exception) + + @pytest.mark.parametrize( + "non_retryable_exception", + [ + core_exceptions.OutOfRange, + core_exceptions.NotFound, + core_exceptions.FailedPrecondition, + RuntimeError, + ValueError, + ], + ) + @pytest.mark.asyncio + async def test_bulk_mutate_rows_non_retryable_errors(self, non_retryable_exception): + """ + If the request fails with a non-retryable error, mutations should not be retried + """ + from google.cloud.bigtable.exceptions import ( + FailedMutationEntryError, + MutationsExceptionGroup, + ) + + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_rows" + ) as mock_gapic: + mock_gapic.side_effect = non_retryable_exception("mock") + with pytest.raises(MutationsExceptionGroup) as e: + mutation = mutations.SetCell( + "family", b"qualifier", b"value", timestamp_micros=123 + ) + entry = mutations.RowMutationEntry(b"row_key", [mutation]) + assert mutation.is_idempotent() is True + await table.bulk_mutate_rows([entry], operation_timeout=0.2) + assert len(e.value.exceptions) == 1 + failed_exception = e.value.exceptions[0] + assert isinstance(failed_exception, FailedMutationEntryError) + assert "non-idempotent" not in str(failed_exception) + cause = failed_exception.__cause__ + assert isinstance(cause, non_retryable_exception) + + @pytest.mark.asyncio + async def test_bulk_mutate_error_index(self): + """ + Test partial failure, partial success. Errors should be associated with the correct index + """ + from google.api_core.exceptions import ( + DeadlineExceeded, + ServiceUnavailable, + FailedPrecondition, + ) + from google.cloud.bigtable.exceptions import ( + RetryExceptionGroup, + FailedMutationEntryError, + MutationsExceptionGroup, + ) + + async with self._make_client(project="project") as client: + async with client.get_table("instance", "table") as table: + with mock.patch.object( + client._gapic_client, "mutate_rows" + ) as mock_gapic: + # fail with retryable errors, then a non-retryable one + mock_gapic.side_effect = [ + self._mock_response([None, ServiceUnavailable("mock"), None]), + self._mock_response([DeadlineExceeded("mock")]), + self._mock_response([FailedPrecondition("final")]), + ] + with pytest.raises(MutationsExceptionGroup) as e: + mutation = mutations.SetCell( + "family", b"qualifier", b"value", timestamp_micros=123 + ) + entries = [ + mutations.RowMutationEntry( + (f"row_key_{i}").encode(), [mutation] + ) + for i in range(3) + ] + assert mutation.is_idempotent() is True + await table.bulk_mutate_rows(entries, operation_timeout=1000) + assert len(e.value.exceptions) == 1 + failed = e.value.exceptions[0] + assert isinstance(failed, FailedMutationEntryError) + assert failed.index == 1 + assert failed.entry == entries[1] + cause = failed.__cause__ + assert isinstance(cause, RetryExceptionGroup) + assert len(cause.exceptions) == 3 + assert isinstance(cause.exceptions[0], ServiceUnavailable) + assert isinstance(cause.exceptions[1], DeadlineExceeded) + assert isinstance(cause.exceptions[2], FailedPrecondition) + + @pytest.mark.parametrize("include_app_profile", [True, False]) + @pytest.mark.asyncio + async def test_bulk_mutate_row_metadata(self, include_app_profile): + """request should attach metadata headers""" + profile = "profile" if include_app_profile else None + async with self._make_client() as client: + async with client.get_table("i", "t", app_profile_id=profile) as table: + with mock.patch.object( + client._gapic_client, "mutate_rows", AsyncMock() + ) as read_rows: + read_rows.side_effect = core_exceptions.Aborted("mock") + try: + await table.bulk_mutate_rows([mock.Mock()]) + except Exception: + # exception used to end early + pass + kwargs = read_rows.call_args_list[0].kwargs + metadata = kwargs["metadata"] + goog_metadata = None + for key, value in metadata: + if key == "x-goog-request-params": + goog_metadata = value + assert goog_metadata is not None, "x-goog-request-params not found" + assert "table_name=" + table.table_name in goog_metadata + if include_app_profile: + assert "app_profile_id=profile" in goog_metadata + else: + assert "app_profile_id=" not in goog_metadata diff --git a/tests/unit/test_exceptions.py b/tests/unit/test_exceptions.py new file mode 100644 index 000000000..49b90a8a9 --- /dev/null +++ b/tests/unit/test_exceptions.py @@ -0,0 +1,239 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import pytest +import sys + +import google.cloud.bigtable.exceptions as bigtable_exceptions + + +class TestBigtableExceptionGroup: + """ + Subclass for MutationsExceptionGroup and RetryExceptionGroup + """ + + def _get_class(self): + from google.cloud.bigtable.exceptions import BigtableExceptionGroup + + return BigtableExceptionGroup + + def _make_one(self, message="test_message", excs=None): + if excs is None: + excs = [RuntimeError("mock")] + + return self._get_class()(message, excs=excs) + + def test_raise(self): + """ + Create exception in raise statement, which calls __new__ and __init__ + """ + test_msg = "test message" + test_excs = [Exception(test_msg)] + with pytest.raises(self._get_class()) as e: + raise self._get_class()(test_msg, test_excs) + assert str(e.value) == test_msg + assert list(e.value.exceptions) == test_excs + + def test_raise_empty_list(self): + """ + Empty exception lists are not supported + """ + with pytest.raises(ValueError) as e: + raise self._make_one(excs=[]) + assert "non-empty sequence" in str(e.value) + + @pytest.mark.skipif( + sys.version_info < (3, 11), reason="requires python3.11 or higher" + ) + def test_311_traceback(self): + """ + Exception customizations should not break rich exception group traceback in python 3.11 + """ + import traceback + + sub_exc1 = RuntimeError("first sub exception") + sub_exc2 = ZeroDivisionError("second sub exception") + exc_group = self._make_one(excs=[sub_exc1, sub_exc2]) + + expected_traceback = ( + f" | google.cloud.bigtable.exceptions.{type(exc_group).__name__}: {str(exc_group)}", + " +-+---------------- 1 ----------------", + " | RuntimeError: first sub exception", + " +---------------- 2 ----------------", + " | ZeroDivisionError: second sub exception", + " +------------------------------------", + ) + exception_caught = False + try: + raise exc_group + except self._get_class(): + exception_caught = True + tb = traceback.format_exc() + tb_relevant_lines = tuple(tb.splitlines()[3:]) + assert expected_traceback == tb_relevant_lines + assert exception_caught + + @pytest.mark.skipif( + sys.version_info < (3, 11), reason="requires python3.11 or higher" + ) + def test_311_exception_group(self): + """ + Python 3.11+ should handle exepctions as native exception groups + """ + exceptions = [RuntimeError("mock"), ValueError("mock")] + instance = self._make_one(excs=exceptions) + # ensure split works as expected + runtime_error, others = instance.split(lambda e: isinstance(e, RuntimeError)) + assert runtime_error.exceptions[0] == exceptions[0] + assert others.exceptions[0] == exceptions[1] + + def test_exception_handling(self): + """ + All versions should inherit from exception + and support tranditional exception handling + """ + instance = self._make_one() + assert isinstance(instance, Exception) + try: + raise instance + except Exception as e: + assert isinstance(e, Exception) + assert e == instance + was_raised = True + assert was_raised + + +class TestMutationsExceptionGroup(TestBigtableExceptionGroup): + def _get_class(self): + from google.cloud.bigtable.exceptions import MutationsExceptionGroup + + return MutationsExceptionGroup + + def _make_one(self, excs=None, num_entries=3): + if excs is None: + excs = [RuntimeError("mock")] + + return self._get_class()(excs, num_entries) + + @pytest.mark.parametrize( + "exception_list,total_entries,expected_message", + [ + ([Exception()], 1, "1 sub-exception (from 1 entry attempted)"), + ([Exception()], 2, "1 sub-exception (from 2 entries attempted)"), + ( + [Exception(), RuntimeError()], + 2, + "2 sub-exceptions (from 2 entries attempted)", + ), + ], + ) + def test_raise(self, exception_list, total_entries, expected_message): + """ + Create exception in raise statement, which calls __new__ and __init__ + """ + with pytest.raises(self._get_class()) as e: + raise self._get_class()(exception_list, total_entries) + assert str(e.value) == expected_message + assert list(e.value.exceptions) == exception_list + + +class TestRetryExceptionGroup(TestBigtableExceptionGroup): + def _get_class(self): + from google.cloud.bigtable.exceptions import RetryExceptionGroup + + return RetryExceptionGroup + + def _make_one(self, excs=None): + if excs is None: + excs = [RuntimeError("mock")] + + return self._get_class()(excs=excs) + + @pytest.mark.parametrize( + "exception_list,expected_message", + [ + ([Exception()], "1 failed attempt: Exception"), + ([Exception(), RuntimeError()], "2 failed attempts. Latest: RuntimeError"), + ( + [Exception(), ValueError("test")], + "2 failed attempts. Latest: ValueError", + ), + ( + [ + bigtable_exceptions.RetryExceptionGroup( + [Exception(), ValueError("test")] + ) + ], + "1 failed attempt: RetryExceptionGroup", + ), + ], + ) + def test_raise(self, exception_list, expected_message): + """ + Create exception in raise statement, which calls __new__ and __init__ + """ + with pytest.raises(self._get_class()) as e: + raise self._get_class()(exception_list) + assert str(e.value) == expected_message + assert list(e.value.exceptions) == exception_list + + +class TestFailedMutationEntryError: + def _get_class(self): + from google.cloud.bigtable.exceptions import FailedMutationEntryError + + return FailedMutationEntryError + + def _make_one(self, idx=9, entry=unittest.mock.Mock(), cause=RuntimeError("mock")): + + return self._get_class()(idx, entry, cause) + + def test_raise(self): + """ + Create exception in raise statement, which calls __new__ and __init__ + """ + test_idx = 2 + test_entry = unittest.mock.Mock() + test_exc = ValueError("test") + with pytest.raises(self._get_class()) as e: + raise self._get_class()(test_idx, test_entry, test_exc) + assert ( + str(e.value) + == "Failed idempotent mutation entry at index 2 with cause: ValueError('test')" + ) + assert e.value.index == test_idx + assert e.value.entry == test_entry + assert e.value.__cause__ == test_exc + assert isinstance(e.value, Exception) + assert test_entry.is_idempotent.call_count == 1 + + def test_raise_idempotent(self): + """ + Test raise with non idempotent entry + """ + test_idx = 2 + test_entry = unittest.mock.Mock() + test_entry.is_idempotent.return_value = False + test_exc = ValueError("test") + with pytest.raises(self._get_class()) as e: + raise self._get_class()(test_idx, test_entry, test_exc) + assert ( + str(e.value) + == "Failed non-idempotent mutation entry at index 2 with cause: ValueError('test')" + ) + assert e.value.index == test_idx + assert e.value.entry == test_entry + assert e.value.__cause__ == test_exc + assert test_entry.is_idempotent.call_count == 1 diff --git a/tests/unit/test_mutations.py b/tests/unit/test_mutations.py new file mode 100644 index 000000000..2a376609e --- /dev/null +++ b/tests/unit/test_mutations.py @@ -0,0 +1,569 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import google.cloud.bigtable.mutations as mutations + +# try/except added for compatibility with python < 3.8 +try: + from unittest import mock +except ImportError: # pragma: NO COVER + import mock # type: ignore + + +class TestBaseMutation: + def _target_class(self): + from google.cloud.bigtable.mutations import Mutation + + return Mutation + + def test__to_dict(self): + """Should be unimplemented in the base class""" + with pytest.raises(NotImplementedError): + self._target_class()._to_dict(mock.Mock()) + + def test_is_idempotent(self): + """is_idempotent should assume True""" + assert self._target_class().is_idempotent(mock.Mock()) + + def test___str__(self): + """Str representation of mutations should be to_dict""" + self_mock = mock.Mock() + str_value = self._target_class().__str__(self_mock) + assert self_mock._to_dict.called + assert str_value == str(self_mock._to_dict.return_value) + + @pytest.mark.parametrize( + "expected_class,input_dict", + [ + ( + mutations.SetCell, + { + "set_cell": { + "family_name": "foo", + "column_qualifier": b"bar", + "value": b"test", + "timestamp_micros": 12345, + } + }, + ), + ( + mutations.DeleteRangeFromColumn, + { + "delete_from_column": { + "family_name": "foo", + "column_qualifier": b"bar", + "time_range": {}, + } + }, + ), + ( + mutations.DeleteRangeFromColumn, + { + "delete_from_column": { + "family_name": "foo", + "column_qualifier": b"bar", + "time_range": {"start_timestamp_micros": 123456789}, + } + }, + ), + ( + mutations.DeleteRangeFromColumn, + { + "delete_from_column": { + "family_name": "foo", + "column_qualifier": b"bar", + "time_range": {"end_timestamp_micros": 123456789}, + } + }, + ), + ( + mutations.DeleteRangeFromColumn, + { + "delete_from_column": { + "family_name": "foo", + "column_qualifier": b"bar", + "time_range": { + "start_timestamp_micros": 123, + "end_timestamp_micros": 123456789, + }, + } + }, + ), + ( + mutations.DeleteAllFromFamily, + {"delete_from_family": {"family_name": "foo"}}, + ), + (mutations.DeleteAllFromRow, {"delete_from_row": {}}), + ], + ) + def test__from_dict(self, expected_class, input_dict): + """Should be able to create instance from dict""" + instance = self._target_class()._from_dict(input_dict) + assert isinstance(instance, expected_class) + found_dict = instance._to_dict() + assert found_dict == input_dict + + @pytest.mark.parametrize( + "input_dict", + [ + {"set_cell": {}}, + { + "set_cell": { + "column_qualifier": b"bar", + "value": b"test", + "timestamp_micros": 12345, + } + }, + { + "set_cell": { + "family_name": "f", + "column_qualifier": b"bar", + "value": b"test", + } + }, + {"delete_from_family": {}}, + {"delete_from_column": {}}, + {"fake-type"}, + {}, + ], + ) + def test__from_dict_missing_fields(self, input_dict): + """If dict is malformed or fields are missing, should raise ValueError""" + with pytest.raises(ValueError): + self._target_class()._from_dict(input_dict) + + def test__from_dict_wrong_subclass(self): + """You shouldn't be able to instantiate one mutation type using the dict of another""" + subclasses = [ + mutations.SetCell("foo", b"bar", b"test"), + mutations.DeleteRangeFromColumn("foo", b"bar"), + mutations.DeleteAllFromFamily("foo"), + mutations.DeleteAllFromRow(), + ] + for instance in subclasses: + others = [other for other in subclasses if other != instance] + for other in others: + with pytest.raises(ValueError) as e: + type(other)._from_dict(instance._to_dict()) + assert "Mutation type mismatch" in str(e.value) + + +class TestSetCell: + def _target_class(self): + from google.cloud.bigtable.mutations import SetCell + + return SetCell + + def _make_one(self, *args, **kwargs): + return self._target_class()(*args, **kwargs) + + def test_ctor(self): + """Ensure constructor sets expected values""" + expected_family = "test-family" + expected_qualifier = b"test-qualifier" + expected_value = b"test-value" + expected_timestamp = 1234567890 + instance = self._make_one( + expected_family, expected_qualifier, expected_value, expected_timestamp + ) + assert instance.family == expected_family + assert instance.qualifier == expected_qualifier + assert instance.new_value == expected_value + assert instance.timestamp_micros == expected_timestamp + + def test_ctor_str_inputs(self): + """Test with string qualifier and value""" + expected_family = "test-family" + expected_qualifier = b"test-qualifier" + expected_value = b"test-value" + instance = self._make_one(expected_family, "test-qualifier", "test-value") + assert instance.family == expected_family + assert instance.qualifier == expected_qualifier + assert instance.new_value == expected_value + + @pytest.mark.parametrize( + "int_value,expected_bytes", + [ + (-42, b"\xff\xff\xff\xff\xff\xff\xff\xd6"), + (-2, b"\xff\xff\xff\xff\xff\xff\xff\xfe"), + (-1, b"\xff\xff\xff\xff\xff\xff\xff\xff"), + (0, b"\x00\x00\x00\x00\x00\x00\x00\x00"), + (1, b"\x00\x00\x00\x00\x00\x00\x00\x01"), + (2, b"\x00\x00\x00\x00\x00\x00\x00\x02"), + (100, b"\x00\x00\x00\x00\x00\x00\x00d"), + ], + ) + def test_ctor_int_value(self, int_value, expected_bytes): + """Test with int value""" + expected_family = "test-family" + expected_qualifier = b"test-qualifier" + instance = self._make_one(expected_family, expected_qualifier, int_value) + assert instance.family == expected_family + assert instance.qualifier == expected_qualifier + assert instance.new_value == expected_bytes + + def test_ctor_negative_timestamp(self): + """Only positive or -1 timestamps are valid""" + with pytest.raises(ValueError) as e: + self._make_one("test-family", b"test-qualifier", b"test-value", -2) + assert ( + "timestamp_micros must be positive (or -1 for server-side timestamp)" + in str(e.value) + ) + + @pytest.mark.parametrize( + "timestamp_ns,expected_timestamp_micros", + [ + (0, 0), + (1, 0), + (123, 0), + (999, 0), + (999_999, 0), + (1_000_000, 1000), + (1_234_567, 1000), + (1_999_999, 1000), + (2_000_000, 2000), + (1_234_567_890_123, 1_234_567_000), + ], + ) + def test_ctor_no_timestamp(self, timestamp_ns, expected_timestamp_micros): + """If no timestamp is given, should use current time with millisecond precision""" + with mock.patch("time.time_ns", return_value=timestamp_ns): + instance = self._make_one("test-family", b"test-qualifier", b"test-value") + assert instance.timestamp_micros == expected_timestamp_micros + + def test__to_dict(self): + """ensure dict representation is as expected""" + expected_family = "test-family" + expected_qualifier = b"test-qualifier" + expected_value = b"test-value" + expected_timestamp = 123456789 + instance = self._make_one( + expected_family, expected_qualifier, expected_value, expected_timestamp + ) + got_dict = instance._to_dict() + assert list(got_dict.keys()) == ["set_cell"] + got_inner_dict = got_dict["set_cell"] + assert got_inner_dict["family_name"] == expected_family + assert got_inner_dict["column_qualifier"] == expected_qualifier + assert got_inner_dict["timestamp_micros"] == expected_timestamp + assert got_inner_dict["value"] == expected_value + assert len(got_inner_dict.keys()) == 4 + + def test__to_dict_server_timestamp(self): + """test with server side timestamp -1 value""" + expected_family = "test-family" + expected_qualifier = b"test-qualifier" + expected_value = b"test-value" + expected_timestamp = -1 + instance = self._make_one( + expected_family, expected_qualifier, expected_value, expected_timestamp + ) + got_dict = instance._to_dict() + assert list(got_dict.keys()) == ["set_cell"] + got_inner_dict = got_dict["set_cell"] + assert got_inner_dict["family_name"] == expected_family + assert got_inner_dict["column_qualifier"] == expected_qualifier + assert got_inner_dict["timestamp_micros"] == expected_timestamp + assert got_inner_dict["value"] == expected_value + assert len(got_inner_dict.keys()) == 4 + + @pytest.mark.parametrize( + "timestamp,expected_value", + [ + (1234567890, True), + (1, True), + (0, True), + (-1, False), + (None, True), + ], + ) + def test_is_idempotent(self, timestamp, expected_value): + """is_idempotent is based on whether an explicit timestamp is set""" + instance = self._make_one( + "test-family", b"test-qualifier", b"test-value", timestamp + ) + assert instance.is_idempotent() is expected_value + + def test___str__(self): + """Str representation of mutations should be to_dict""" + instance = self._make_one( + "test-family", b"test-qualifier", b"test-value", 1234567890 + ) + str_value = instance.__str__() + dict_value = instance._to_dict() + assert str_value == str(dict_value) + + +class TestDeleteRangeFromColumn: + def _target_class(self): + from google.cloud.bigtable.mutations import DeleteRangeFromColumn + + return DeleteRangeFromColumn + + def _make_one(self, *args, **kwargs): + return self._target_class()(*args, **kwargs) + + def test_ctor(self): + expected_family = "test-family" + expected_qualifier = b"test-qualifier" + expected_start = 1234567890 + expected_end = 1234567891 + instance = self._make_one( + expected_family, expected_qualifier, expected_start, expected_end + ) + assert instance.family == expected_family + assert instance.qualifier == expected_qualifier + assert instance.start_timestamp_micros == expected_start + assert instance.end_timestamp_micros == expected_end + + def test_ctor_no_timestamps(self): + expected_family = "test-family" + expected_qualifier = b"test-qualifier" + instance = self._make_one(expected_family, expected_qualifier) + assert instance.family == expected_family + assert instance.qualifier == expected_qualifier + assert instance.start_timestamp_micros is None + assert instance.end_timestamp_micros is None + + def test_ctor_timestamps_out_of_order(self): + expected_family = "test-family" + expected_qualifier = b"test-qualifier" + expected_start = 10 + expected_end = 1 + with pytest.raises(ValueError) as excinfo: + self._make_one( + expected_family, expected_qualifier, expected_start, expected_end + ) + assert "start_timestamp_micros must be <= end_timestamp_micros" in str( + excinfo.value + ) + + @pytest.mark.parametrize( + "start,end", + [ + (0, 1), + (None, 1), + (0, None), + ], + ) + def test__to_dict(self, start, end): + """Should be unimplemented in the base class""" + expected_family = "test-family" + expected_qualifier = b"test-qualifier" + + instance = self._make_one(expected_family, expected_qualifier, start, end) + got_dict = instance._to_dict() + assert list(got_dict.keys()) == ["delete_from_column"] + got_inner_dict = got_dict["delete_from_column"] + assert len(got_inner_dict.keys()) == 3 + assert got_inner_dict["family_name"] == expected_family + assert got_inner_dict["column_qualifier"] == expected_qualifier + time_range_dict = got_inner_dict["time_range"] + expected_len = int(isinstance(start, int)) + int(isinstance(end, int)) + assert len(time_range_dict.keys()) == expected_len + if start is not None: + assert time_range_dict["start_timestamp_micros"] == start + if end is not None: + assert time_range_dict["end_timestamp_micros"] == end + + def test_is_idempotent(self): + """is_idempotent is always true""" + instance = self._make_one( + "test-family", b"test-qualifier", 1234567890, 1234567891 + ) + assert instance.is_idempotent() is True + + def test___str__(self): + """Str representation of mutations should be to_dict""" + instance = self._make_one("test-family", b"test-qualifier") + str_value = instance.__str__() + dict_value = instance._to_dict() + assert str_value == str(dict_value) + + +class TestDeleteAllFromFamily: + def _target_class(self): + from google.cloud.bigtable.mutations import DeleteAllFromFamily + + return DeleteAllFromFamily + + def _make_one(self, *args, **kwargs): + return self._target_class()(*args, **kwargs) + + def test_ctor(self): + expected_family = "test-family" + instance = self._make_one(expected_family) + assert instance.family_to_delete == expected_family + + def test__to_dict(self): + """Should be unimplemented in the base class""" + expected_family = "test-family" + instance = self._make_one(expected_family) + got_dict = instance._to_dict() + assert list(got_dict.keys()) == ["delete_from_family"] + got_inner_dict = got_dict["delete_from_family"] + assert len(got_inner_dict.keys()) == 1 + assert got_inner_dict["family_name"] == expected_family + + def test_is_idempotent(self): + """is_idempotent is always true""" + instance = self._make_one("test-family") + assert instance.is_idempotent() is True + + def test___str__(self): + """Str representation of mutations should be to_dict""" + instance = self._make_one("test-family") + str_value = instance.__str__() + dict_value = instance._to_dict() + assert str_value == str(dict_value) + + +class TestDeleteFromRow: + def _target_class(self): + from google.cloud.bigtable.mutations import DeleteAllFromRow + + return DeleteAllFromRow + + def _make_one(self, *args, **kwargs): + return self._target_class()(*args, **kwargs) + + def test_ctor(self): + self._make_one() + + def test__to_dict(self): + """Should be unimplemented in the base class""" + instance = self._make_one() + got_dict = instance._to_dict() + assert list(got_dict.keys()) == ["delete_from_row"] + assert len(got_dict["delete_from_row"].keys()) == 0 + + def test_is_idempotent(self): + """is_idempotent is always true""" + instance = self._make_one() + assert instance.is_idempotent() is True + + def test___str__(self): + """Str representation of mutations should be to_dict""" + instance = self._make_one() + assert instance.__str__() == "{'delete_from_row': {}}" + + +class TestRowMutationEntry: + def _target_class(self): + from google.cloud.bigtable.mutations import RowMutationEntry + + return RowMutationEntry + + def _make_one(self, row_key, mutations): + return self._target_class()(row_key, mutations) + + def test_ctor(self): + expected_key = b"row_key" + expected_mutations = [mock.Mock()] + instance = self._make_one(expected_key, expected_mutations) + assert instance.row_key == expected_key + assert list(instance.mutations) == expected_mutations + + def test_ctor_str_key(self): + expected_key = "row_key" + expected_mutations = [mock.Mock(), mock.Mock()] + instance = self._make_one(expected_key, expected_mutations) + assert instance.row_key == b"row_key" + assert list(instance.mutations) == expected_mutations + + def test_ctor_single_mutation(self): + from google.cloud.bigtable.mutations import DeleteAllFromRow + + expected_key = b"row_key" + expected_mutations = DeleteAllFromRow() + instance = self._make_one(expected_key, expected_mutations) + assert instance.row_key == expected_key + assert instance.mutations == (expected_mutations,) + + def test__to_dict(self): + expected_key = "row_key" + mutation_mock = mock.Mock() + n_mutations = 3 + expected_mutations = [mutation_mock for i in range(n_mutations)] + for mock_mutations in expected_mutations: + mock_mutations._to_dict.return_value = {"test": "data"} + instance = self._make_one(expected_key, expected_mutations) + expected_result = { + "row_key": b"row_key", + "mutations": [{"test": "data"}] * n_mutations, + } + assert instance._to_dict() == expected_result + assert mutation_mock._to_dict.call_count == n_mutations + + @pytest.mark.parametrize( + "mutations,result", + [ + ([], True), + ([mock.Mock(is_idempotent=lambda: True)], True), + ([mock.Mock(is_idempotent=lambda: False)], False), + ( + [ + mock.Mock(is_idempotent=lambda: True), + mock.Mock(is_idempotent=lambda: False), + ], + False, + ), + ( + [ + mock.Mock(is_idempotent=lambda: True), + mock.Mock(is_idempotent=lambda: True), + ], + True, + ), + ], + ) + def test_is_idempotent(self, mutations, result): + instance = self._make_one("row_key", mutations) + assert instance.is_idempotent() == result + + def test__from_dict_mock(self): + """ + test creating instance from entry dict, with mocked mutation._from_dict + """ + expected_key = b"row_key" + expected_mutations = [mock.Mock(), mock.Mock()] + input_dict = { + "row_key": expected_key, + "mutations": [{"test": "data"}, {"another": "data"}], + } + with mock.patch.object(mutations.Mutation, "_from_dict") as inner_from_dict: + inner_from_dict.side_effect = expected_mutations + instance = self._target_class()._from_dict(input_dict) + assert instance.row_key == b"row_key" + assert inner_from_dict.call_count == 2 + assert len(instance.mutations) == 2 + assert instance.mutations[0] == expected_mutations[0] + assert instance.mutations[1] == expected_mutations[1] + + def test__from_dict(self): + """ + test creating end-to-end with a real mutation instance + """ + input_dict = { + "row_key": b"row_key", + "mutations": [{"delete_from_family": {"family_name": "test_family"}}], + } + instance = self._target_class()._from_dict(input_dict) + assert instance.row_key == b"row_key" + assert len(instance.mutations) == 1 + assert isinstance(instance.mutations[0], mutations.DeleteAllFromFamily) + assert instance.mutations[0].family_to_delete == "test_family"