Skip to content

Commit

Permalink
Merge branch 'experimental_v3' into remove_dicts
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche authored Oct 28, 2023
2 parents a0c8834 + 94bfe66 commit 9300ec5
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 134 deletions.
1 change: 1 addition & 0 deletions .github/workflows/conformance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
matrix:
py-version: [ 3.8 ]
client-type: [ "Async v3", "Legacy" ]
fail-fast: false
name: "${{ matrix.client-type }} Client / Python ${{ matrix.py-version }}"
steps:
- uses: actions/checkout@v3
Expand Down
12 changes: 5 additions & 7 deletions google/cloud/bigtable/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import List, Tuple

from google.cloud.bigtable import gapic_version as package_version

from google.cloud.bigtable.data._async.client import BigtableDataClientAsync
Expand Down Expand Up @@ -44,10 +41,10 @@
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup

# Type alias for the output of sample_keys
RowKeySamples = List[Tuple[bytes, int]]
# type alias for the output of query.shard()
ShardedQuery = List[ReadRowsQuery]
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
from google.cloud.bigtable.data._helpers import RowKeySamples
from google.cloud.bigtable.data._helpers import ShardedQuery


__version__: str = package_version.__version__

Expand All @@ -74,4 +71,5 @@
"MutationsExceptionGroup",
"ShardedReadRowsExceptionGroup",
"ShardedQuery",
"TABLE_DEFAULT",
)
1 change: 1 addition & 0 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def __init__(
table_name=table.table_name,
app_profile_id=table.app_profile_id,
metadata=metadata,
retry=None,
)
# create predicate for determining which errors are retryable
self.is_retryable = retries.if_exception_type(
Expand Down
1 change: 1 addition & 0 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
self.request,
timeout=next(self.attempt_timeout_gen),
metadata=self._metadata,
retry=None,
)
chunked_stream = self.chunk_stream(gapic_stream)
return self.merge_rows(chunked_stream)
Expand Down
178 changes: 75 additions & 103 deletions google/cloud/bigtable/data/_async/client.py

Large diffs are not rendered by default.

27 changes: 11 additions & 16 deletions google/cloud/bigtable/data/_async/mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
from google.cloud.bigtable.data.exceptions import FailedMutationEntryError
from google.cloud.bigtable.data._helpers import _validate_timeouts
from google.cloud.bigtable.data._helpers import _get_timeouts
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT

from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync
from google.cloud.bigtable.data._async._mutate_rows import (
Expand Down Expand Up @@ -189,8 +190,8 @@ def __init__(
flush_limit_bytes: int = 20 * _MB_SIZE,
flow_control_max_mutation_count: int = 100_000,
flow_control_max_bytes: int = 100 * _MB_SIZE,
batch_operation_timeout: float | None = None,
batch_attempt_timeout: float | None = None,
batch_operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS,
batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS,
):
"""
Args:
Expand All @@ -202,21 +203,15 @@ def __init__(
- flush_limit_bytes: Flush immediately after flush_limit_bytes bytes are added.
- flow_control_max_mutation_count: Maximum number of inflight mutations.
- flow_control_max_bytes: Maximum number of inflight bytes.
- batch_operation_timeout: timeout for each mutate_rows operation, in seconds. If None,
table default_mutate_rows_operation_timeout will be used
- batch_attempt_timeout: timeout for each individual request, in seconds. If None,
table default_mutate_rows_attempt_timeout will be used, or batch_operation_timeout
if that is also None.
- batch_operation_timeout: timeout for each mutate_rows operation, in seconds.
If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_operation_timeout.
- batch_attempt_timeout: timeout for each individual request, in seconds.
If TABLE_DEFAULT, defaults to the Table's default_mutate_rows_attempt_timeout.
If None, defaults to batch_operation_timeout.
"""
self._operation_timeout: float = (
batch_operation_timeout or table.default_mutate_rows_operation_timeout
self._operation_timeout, self._attempt_timeout = _get_timeouts(
batch_operation_timeout, batch_attempt_timeout, table
)
self._attempt_timeout: float = (
batch_attempt_timeout
or table.default_mutate_rows_attempt_timeout
or self._operation_timeout
)
_validate_timeouts(self._operation_timeout, self._attempt_timeout)
self.closed: bool = False
self._table = table
self._staged_entries: list[RowMutationEntry] = []
Expand Down
76 changes: 75 additions & 1 deletion google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
#
from __future__ import annotations

from typing import Callable, Any
from typing import Callable, List, Tuple, Any
import time
import enum
from collections import namedtuple
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery

from google.api_core import exceptions as core_exceptions
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
Expand All @@ -23,6 +26,30 @@
Helper functions used in various places in the library.
"""

# Type alias for the output of sample_keys
RowKeySamples = List[Tuple[bytes, int]]

# type alias for the output of query.shard()
ShardedQuery = List[ReadRowsQuery]

# used by read_rows_sharded to limit how many requests are attempted in parallel
_CONCURRENCY_LIMIT = 10

# used to register instance data with the client for channel warming
_WarmedInstanceKey = namedtuple(
"_WarmedInstanceKey", ["instance_name", "table_name", "app_profile_id"]
)


# enum used on method calls when table defaults should be used
class TABLE_DEFAULT(enum.Enum):
# default for mutate_row, sample_row_keys, check_and_mutate_row, and read_modify_write_row
DEFAULT = "DEFAULT"
# default for read_rows, read_rows_stream, read_rows_sharded, row_exists, and read_row
READ_ROWS = "READ_ROWS_DEFAULT"
# default for bulk_mutate_rows and mutations_batcher
MUTATE_ROWS = "MUTATE_ROWS_DEFAULT"


def _make_metadata(
table_name: str, app_profile_id: str | None
Expand Down Expand Up @@ -114,6 +141,51 @@ def wrapper(*args, **kwargs):
return wrapper_async if is_async else wrapper


def _get_timeouts(
operation: float | TABLE_DEFAULT, attempt: float | None | TABLE_DEFAULT, table
) -> tuple[float, float]:
"""
Convert passed in timeout values to floats, using table defaults if necessary.
attempt will use operation value if None, or if larger than operation.
Will call _validate_timeouts on the outputs, and raise ValueError if the
resulting timeouts are invalid.
Args:
- operation: The timeout value to use for the entire operation, in seconds.
- attempt: The timeout value to use for each attempt, in seconds.
- table: The table to use for default values.
Returns:
- A tuple of (operation_timeout, attempt_timeout)
"""
# load table defaults if necessary
if operation == TABLE_DEFAULT.DEFAULT:
final_operation = table.default_operation_timeout
elif operation == TABLE_DEFAULT.READ_ROWS:
final_operation = table.default_read_rows_operation_timeout
elif operation == TABLE_DEFAULT.MUTATE_ROWS:
final_operation = table.default_mutate_rows_operation_timeout
else:
final_operation = operation
if attempt == TABLE_DEFAULT.DEFAULT:
attempt = table.default_attempt_timeout
elif attempt == TABLE_DEFAULT.READ_ROWS:
attempt = table.default_read_rows_attempt_timeout
elif attempt == TABLE_DEFAULT.MUTATE_ROWS:
attempt = table.default_mutate_rows_attempt_timeout

if attempt is None:
# no timeout specified, use operation timeout for both
final_attempt = final_operation
else:
# cap attempt timeout at operation timeout
final_attempt = min(attempt, final_operation) if final_operation else attempt

_validate_timeouts(final_operation, final_attempt, allow_none=False)
return final_operation, final_attempt


def _validate_timeouts(
operation_timeout: float, attempt_timeout: float | None, allow_none: bool = False
):
Expand All @@ -128,6 +200,8 @@ def _validate_timeouts(
Raises:
- ValueError if operation_timeout or attempt_timeout are invalid.
"""
if operation_timeout is None:
raise ValueError("operation_timeout cannot be None")
if operation_timeout <= 0:
raise ValueError("operation_timeout must be greater than 0")
if not allow_none and attempt_timeout is None:
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigtable_v2/services/bigtable/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
from google.oauth2 import service_account # type: ignore

try:
OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault]
OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
except AttributeError: # pragma: NO COVER
OptionalRetry = Union[retries.Retry, object] # type: ignore
OptionalRetry = Union[retries.Retry, object, None] # type: ignore

from google.cloud.bigtable_v2.types import bigtable
from google.cloud.bigtable_v2.types import data
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigtable_v2/services/bigtable/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
from google.oauth2 import service_account # type: ignore

try:
OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault]
OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
except AttributeError: # pragma: NO COVER
OptionalRetry = Union[retries.Retry, object] # type: ignore
OptionalRetry = Union[retries.Retry, object, None] # type: ignore

from google.cloud.bigtable_v2.types import bigtable
from google.cloud.bigtable_v2.types import data
Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ def prerelease_deps(session):
# Exclude version 1.52.0rc1 which has a known issue. See https://github.com/grpc/grpc/issues/32163
"grpcio!=1.52.0rc1",
"grpcio-status",
"google-api-core",
"google-api-core==2.12.0.dev1", # TODO: remove this once streaming retries is merged
"proto-plus",
"google-cloud-testutils",
# dependencies of google-cloud-testutils"
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/data/_async/test__mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,10 @@ def test_ctor(self):
assert client.mutate_rows.call_count == 1
# gapic_fn should call with table details
inner_kwargs = client.mutate_rows.call_args[1]
assert len(inner_kwargs) == 3
assert len(inner_kwargs) == 4
assert inner_kwargs["table_name"] == table.table_name
assert inner_kwargs["app_profile_id"] == table.app_profile_id
assert inner_kwargs["retry"] is None
metadata = inner_kwargs["metadata"]
assert len(metadata) == 1
assert metadata[0][0] == "x-goog-request-params"
Expand Down
9 changes: 8 additions & 1 deletion tests/unit/data/_async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,7 @@ async def test_read_rows_attempt_timeout(
# check timeouts
for _, call_kwargs in read_rows.call_args_list[:-1]:
assert call_kwargs["timeout"] == per_request_t
assert call_kwargs["retry"] is None
# last timeout should be adjusted to account for the time spent
assert (
abs(
Expand Down Expand Up @@ -1884,6 +1885,7 @@ async def test_sample_row_keys_default_timeout(self):
_, kwargs = sample_row_keys.call_args
assert abs(kwargs["timeout"] - expected_timeout) < 0.1
assert result == []
assert kwargs["retry"] is None

@pytest.mark.asyncio
async def test_sample_row_keys_gapic_params(self):
Expand All @@ -1905,11 +1907,12 @@ async def test_sample_row_keys_gapic_params(self):
await table.sample_row_keys(attempt_timeout=expected_timeout)
args, kwargs = sample_row_keys.call_args
assert len(args) == 0
assert len(kwargs) == 4
assert len(kwargs) == 5
assert kwargs["timeout"] == expected_timeout
assert kwargs["app_profile_id"] == expected_profile
assert kwargs["table_name"] == table.table_name
assert kwargs["metadata"] is not None
assert kwargs["retry"] is None

@pytest.mark.parametrize("include_app_profile", [True, False])
@pytest.mark.asyncio
Expand Down Expand Up @@ -2239,6 +2242,7 @@ async def test_bulk_mutate_rows(self, mutation_arg):
)
assert kwargs["entries"] == [bulk_mutation._to_pb()]
assert kwargs["timeout"] == expected_attempt_timeout
assert kwargs["retry"] is None

@pytest.mark.asyncio
async def test_bulk_mutate_rows_multiple_entries(self):
Expand Down Expand Up @@ -2602,6 +2606,7 @@ async def test_check_and_mutate(self, gapic_result):
]
assert kwargs["app_profile_id"] == app_profile
assert kwargs["timeout"] == operation_timeout
assert kwargs["retry"] is None

@pytest.mark.asyncio
async def test_check_and_mutate_bad_timeout(self):
Expand Down Expand Up @@ -2684,6 +2689,7 @@ async def test_check_and_mutate_predicate_object(self):
kwargs = mock_gapic.call_args[1]
assert kwargs["predicate_filter"] == predicate_pb
assert mock_predicate._to_pb.call_count == 1
assert kwargs["retry"] is None

@pytest.mark.asyncio
async def test_check_and_mutate_mutations_parsing(self):
Expand Down Expand Up @@ -2787,6 +2793,7 @@ async def test_read_modify_write_call_rule_args(self, call_rules, expected_rules
assert mock_gapic.call_count == 1
found_kwargs = mock_gapic.call_args_list[0][1]
assert found_kwargs["rules"] == expected_rules
assert found_kwargs["retry"] is None

@pytest.mark.parametrize("rules", [[], None])
@pytest.mark.asyncio
Expand Down
65 changes: 65 additions & 0 deletions tests/unit/data/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import pytest
import google.cloud.bigtable.data._helpers as _helpers
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
import google.cloud.bigtable.data.exceptions as bigtable_exceptions

import mock
Expand Down Expand Up @@ -199,3 +200,67 @@ def test_validate_with_inputs(self, args, expected):
except ValueError:
pass
assert success == expected


class TestGetTimeouts:
@pytest.mark.parametrize(
"input_times,input_table,expected",
[
((2, 1), {}, (2, 1)),
((2, 4), {}, (2, 2)),
((2, None), {}, (2, 2)),
(
(TABLE_DEFAULT.DEFAULT, TABLE_DEFAULT.DEFAULT),
{"operation": 3, "attempt": 2},
(3, 2),
),
(
(TABLE_DEFAULT.READ_ROWS, TABLE_DEFAULT.READ_ROWS),
{"read_rows_operation": 3, "read_rows_attempt": 2},
(3, 2),
),
(
(TABLE_DEFAULT.MUTATE_ROWS, TABLE_DEFAULT.MUTATE_ROWS),
{"mutate_rows_operation": 3, "mutate_rows_attempt": 2},
(3, 2),
),
((10, TABLE_DEFAULT.DEFAULT), {"attempt": None}, (10, 10)),
((10, TABLE_DEFAULT.DEFAULT), {"attempt": 5}, (10, 5)),
((10, TABLE_DEFAULT.DEFAULT), {"attempt": 100}, (10, 10)),
((TABLE_DEFAULT.DEFAULT, 10), {"operation": 12}, (12, 10)),
((TABLE_DEFAULT.DEFAULT, 10), {"operation": 3}, (3, 3)),
],
)
def test_get_timeouts(self, input_times, input_table, expected):
"""
test input/output mappings for a variety of valid inputs
"""
fake_table = mock.Mock()
for key in input_table.keys():
# set the default fields in our fake table mock
setattr(fake_table, f"default_{key}_timeout", input_table[key])
t1, t2 = _helpers._get_timeouts(input_times[0], input_times[1], fake_table)
assert t1 == expected[0]
assert t2 == expected[1]

@pytest.mark.parametrize(
"input_times,input_table",
[
([0, 1], {}),
([1, 0], {}),
([None, 1], {}),
([TABLE_DEFAULT.DEFAULT, 1], {"operation": None}),
([TABLE_DEFAULT.DEFAULT, 1], {"operation": 0}),
([1, TABLE_DEFAULT.DEFAULT], {"attempt": 0}),
],
)
def test_get_timeouts_invalid(self, input_times, input_table):
"""
test with inputs that should raise error during validation step
"""
fake_table = mock.Mock()
for key in input_table.keys():
# set the default fields in our fake table mock
setattr(fake_table, f"default_{key}_timeout", input_table[key])
with pytest.raises(ValueError):
_helpers._get_timeouts(input_times[0], input_times[1], fake_table)

0 comments on commit 9300ec5

Please sign in to comment.