diff --git a/google/cloud/bigtable/row_data.py b/google/cloud/bigtable/row_data.py index ab0358285..62ef5a201 100644 --- a/google/cloud/bigtable/row_data.py +++ b/google/cloud/bigtable/row_data.py @@ -332,10 +332,32 @@ class InvalidRetryRequest(RuntimeError): """Exception raised when retry request is invalid.""" +RETRYABLE_INTERNAL_ERROR_MESSAGES = ( + "rst_stream", + "rst stream", + "received unexpected eos on data frame from server", +) +"""Internal error messages that can be retried during read row and mutation.""" + + +def _retriable_internal_server_error(exc): + """ + Return True if the internal server error is retriable. + """ + return isinstance(exc, exceptions.InternalServerError) and any( + retryable_message in exc.message.lower() + for retryable_message in RETRYABLE_INTERNAL_ERROR_MESSAGES + ) + + def _retry_read_rows_exception(exc): + """Return True if the exception is retriable for read row requests.""" if isinstance(exc, grpc.RpcError): exc = exceptions.from_grpc_error(exc) - return isinstance(exc, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded)) + + return _retriable_internal_server_error(exc) or isinstance( + exc, (exceptions.ServiceUnavailable, exceptions.DeadlineExceeded) + ) DEFAULT_RETRY_READ_ROWS = retry.Retry( diff --git a/google/cloud/bigtable/table.py b/google/cloud/bigtable/table.py index fddd04809..8605992ba 100644 --- a/google/cloud/bigtable/table.py +++ b/google/cloud/bigtable/table.py @@ -23,6 +23,7 @@ from google.api_core.exceptions import NotFound from google.api_core.exceptions import RetryError from google.api_core.exceptions import ServiceUnavailable +from google.api_core.exceptions import InternalServerError from google.api_core.gapic_v1.method import DEFAULT from google.api_core.retry import if_exception_type from google.api_core.retry import Retry @@ -37,7 +38,10 @@ from google.cloud.bigtable.row import AppendRow from google.cloud.bigtable.row import ConditionalRow from google.cloud.bigtable.row import DirectRow -from google.cloud.bigtable.row_data import PartialRowsData +from google.cloud.bigtable.row_data import ( + PartialRowsData, + _retriable_internal_server_error, +) from google.cloud.bigtable.row_data import DEFAULT_RETRY_READ_ROWS from google.cloud.bigtable.row_set import RowSet from google.cloud.bigtable.row_set import RowRange @@ -55,9 +59,15 @@ _MAX_BULK_MUTATIONS = 100000 VIEW_NAME_ONLY = enums.Table.View.NAME_ONLY -RETRYABLE_MUTATION_ERRORS = (Aborted, DeadlineExceeded, ServiceUnavailable) +RETRYABLE_MUTATION_ERRORS = ( + Aborted, + DeadlineExceeded, + ServiceUnavailable, + InternalServerError, +) """Errors which can be retried during row mutation.""" + RETRYABLE_CODES: Set[int] = set() for retryable in RETRYABLE_MUTATION_ERRORS: @@ -1130,11 +1140,18 @@ def _do_mutate_retryable_rows(self): retry=None, **kwargs ) - except RETRYABLE_MUTATION_ERRORS: + except RETRYABLE_MUTATION_ERRORS as exc: # If an exception, considered retryable by `RETRYABLE_MUTATION_ERRORS`, is # returned from the initial call, consider # it to be retryable. Wrap as a Bigtable Retryable Error. - raise _BigtableRetryableError + # For InternalServerError, it is only retriable if the message is related to RST Stream messages + if _retriable_internal_server_error(exc) or not isinstance( + exc, InternalServerError + ): + raise _BigtableRetryableError + else: + # re-raise the original exception + raise num_responses = 0 num_retryable_responses = 0 diff --git a/tests/unit/test_row_data.py b/tests/unit/test_row_data.py index f087ff450..9175bf479 100644 --- a/tests/unit/test_row_data.py +++ b/tests/unit/test_row_data.py @@ -310,6 +310,31 @@ def test__retry_read_rows_exception_deadline_exceeded(): assert _retry_read_rows_exception(exception) +def test__retry_read_rows_exception_internal_server_not_retriable(): + from google.api_core.exceptions import InternalServerError + from google.cloud.bigtable.row_data import ( + _retry_read_rows_exception, + RETRYABLE_INTERNAL_ERROR_MESSAGES, + ) + + err_message = "500 Error" + exception = InternalServerError(err_message) + assert err_message not in RETRYABLE_INTERNAL_ERROR_MESSAGES + assert not _retry_read_rows_exception(exception) + + +def test__retry_read_rows_exception_internal_server_retriable(): + from google.api_core.exceptions import InternalServerError + from google.cloud.bigtable.row_data import ( + _retry_read_rows_exception, + RETRYABLE_INTERNAL_ERROR_MESSAGES, + ) + + for err_message in RETRYABLE_INTERNAL_ERROR_MESSAGES: + exception = InternalServerError(err_message) + assert _retry_read_rows_exception(exception) + + def test__retry_read_rows_exception_miss_wrapped_in_grpc(): from google.api_core.exceptions import Conflict from google.cloud.bigtable.row_data import _retry_read_rows_exception diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index a89e02e8c..e66a8f0f6 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -47,6 +47,7 @@ RETRYABLE_3 = StatusCode.UNAVAILABLE.value[0] RETRYABLES = (RETRYABLE_1, RETRYABLE_2, RETRYABLE_3) NON_RETRYABLE = StatusCode.CANCELLED.value[0] +STATUS_INTERNAL = StatusCode.INTERNAL.value[0] @mock.patch("google.cloud.bigtable.table._MAX_BULK_MUTATIONS", new=3) @@ -1636,6 +1637,7 @@ def _do_mutate_retryable_rows_helper( raising_retry=False, retryable_error=False, timeout=None, + mutate_rows_side_effect=None, ): from google.api_core.exceptions import ServiceUnavailable from google.cloud.bigtable.row import DirectRow @@ -1664,8 +1666,13 @@ def _do_mutate_retryable_rows_helper( data_api = client._table_data_client = _make_data_api() if retryable_error: - data_api.mutate_rows.side_effect = ServiceUnavailable("testing") + if mutate_rows_side_effect is not None: + data_api.mutate_rows.side_effect = mutate_rows_side_effect + else: + data_api.mutate_rows.side_effect = ServiceUnavailable("testing") else: + if mutate_rows_side_effect is not None: + data_api.mutate_rows.side_effect = mutate_rows_side_effect data_api.mutate_rows.return_value = [response] worker = _make_worker(client, table.name, rows=rows) @@ -1785,6 +1792,52 @@ def test_rmrw_do_mutate_retryable_rows_w_retryable_error(): ) +def test_rmrw_do_mutate_retryable_rows_w_retryable_error_internal_rst_stream_error(): + # Mutate two rows + # Raise internal server error with RST STREAM error messages + # There should be no error raised and that the request is retried + from google.api_core.exceptions import InternalServerError + from google.cloud.bigtable.row_data import RETRYABLE_INTERNAL_ERROR_MESSAGES + + row_cells = [ + (b"row_key_1", ("cf", b"col", b"value1")), + (b"row_key_2", ("cf", b"col", b"value2")), + ] + responses = () + + for retryable_internal_error_message in RETRYABLE_INTERNAL_ERROR_MESSAGES: + for message in [ + retryable_internal_error_message, + retryable_internal_error_message.upper(), + ]: + _do_mutate_retryable_rows_helper( + row_cells, + responses, + retryable_error=True, + mutate_rows_side_effect=InternalServerError(message), + ) + + +def test_rmrw_do_mutate_rows_w_retryable_error_internal_not_retryable(): + # Mutate two rows + # Raise internal server error but not RST STREAM error messages + # mutate_rows should raise Internal Server Error + from google.api_core.exceptions import InternalServerError + + row_cells = [ + (b"row_key_1", ("cf", b"col", b"value1")), + (b"row_key_2", ("cf", b"col", b"value2")), + ] + responses = () + + with pytest.raises(InternalServerError): + _do_mutate_retryable_rows_helper( + row_cells, + responses, + mutate_rows_side_effect=InternalServerError("Error not retryable."), + ) + + def test_rmrw_do_mutate_retryable_rows_retry(): # # Setup: