Skip to content

Commit

Permalink
chore: reduce public api surface (#820)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche authored Aug 1, 2023
1 parent 0d92a84 commit a8cdf7c
Show file tree
Hide file tree
Showing 16 changed files with 408 additions and 230 deletions.
6 changes: 3 additions & 3 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator

# mutate_rows requests are limited to this number of mutations
from google.cloud.bigtable.data.mutations import MUTATE_ROWS_REQUEST_MUTATION_LIMIT
from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT

if TYPE_CHECKING:
from google.cloud.bigtable_v2.services.bigtable.async_client import (
Expand Down Expand Up @@ -65,10 +65,10 @@ def __init__(
"""
# check that mutations are within limits
total_mutations = sum(len(entry.mutations) for entry in mutation_entries)
if total_mutations > MUTATE_ROWS_REQUEST_MUTATION_LIMIT:
if total_mutations > _MUTATE_ROWS_REQUEST_MUTATION_LIMIT:
raise ValueError(
"mutate_rows requests can contain at most "
f"{MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations across "
f"{_MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations across "
f"all entries. Found {total_mutations}."
)
# create partial function to pass to trigger rpc call
Expand Down
28 changes: 14 additions & 14 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
from google.cloud.bigtable.data import ShardedQuery

# used by read_rows_sharded to limit how many requests are attempted in parallel
CONCURRENCY_LIMIT = 10
_CONCURRENCY_LIMIT = 10

# used to register instance data with the client for channel warming
_WarmedInstanceKey = namedtuple(
Expand Down Expand Up @@ -154,7 +154,7 @@ def __init__(
self._channel_init_time = time.monotonic()
self._channel_refresh_tasks: list[asyncio.Task[None]] = []
try:
self.start_background_channel_refresh()
self._start_background_channel_refresh()
except RuntimeError:
warnings.warn(
f"{self.__class__.__name__} should be started in an "
Expand All @@ -163,7 +163,7 @@ def __init__(
stacklevel=2,
)

def start_background_channel_refresh(self) -> None:
def _start_background_channel_refresh(self) -> None:
"""
Starts a background task to ping and warm each channel in the pool
Raises:
Expand Down Expand Up @@ -309,7 +309,7 @@ async def _register_instance(self, instance_id: str, owner: TableAsync) -> None:
await self._ping_and_warm_instances(channel, instance_key)
else:
# refresh tasks aren't active. start them as background tasks
self.start_background_channel_refresh()
self._start_background_channel_refresh()

async def _remove_instance_registration(
self, instance_id: str, owner: TableAsync
Expand Down Expand Up @@ -388,7 +388,7 @@ def get_table(
)

async def __aenter__(self):
self.start_background_channel_refresh()
self._start_background_channel_refresh()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
Expand Down Expand Up @@ -493,7 +493,7 @@ def __init__(

async def read_rows_stream(
self,
query: ReadRowsQuery | dict[str, Any],
query: ReadRowsQuery,
*,
operation_timeout: float | None = None,
attempt_timeout: float | None = None,
Expand Down Expand Up @@ -557,7 +557,7 @@ async def read_rows_stream(

async def read_rows(
self,
query: ReadRowsQuery | dict[str, Any],
query: ReadRowsQuery,
*,
operation_timeout: float | None = None,
attempt_timeout: float | None = None,
Expand Down Expand Up @@ -687,10 +687,10 @@ async def read_rows_sharded(
timeout_generator = _attempt_timeout_generator(
operation_timeout, operation_timeout
)
# submit shards in batches if the number of shards goes over CONCURRENCY_LIMIT
# submit shards in batches if the number of shards goes over _CONCURRENCY_LIMIT
batched_queries = [
sharded_query[i : i + CONCURRENCY_LIMIT]
for i in range(0, len(sharded_query), CONCURRENCY_LIMIT)
sharded_query[i : i + _CONCURRENCY_LIMIT]
for i in range(0, len(sharded_query), _CONCURRENCY_LIMIT)
]
# run batches and collect results
results_list = []
Expand Down Expand Up @@ -1038,7 +1038,7 @@ async def bulk_mutate_rows(
async def check_and_mutate_row(
self,
row_key: str | bytes,
predicate: RowFilter | dict[str, Any] | None,
predicate: RowFilter | None,
*,
true_case_mutations: Mutation | list[Mutation] | None = None,
false_case_mutations: Mutation | list[Mutation] | None = None,
Expand Down Expand Up @@ -1091,12 +1091,12 @@ async def check_and_mutate_row(
):
false_case_mutations = [false_case_mutations]
false_case_dict = [m._to_dict() for m in false_case_mutations or []]
if predicate is not None and not isinstance(predicate, dict):
predicate = predicate.to_dict()
metadata = _make_metadata(self.table_name, self.app_profile_id)
result = await self.client._gapic_client.check_and_mutate_row(
request={
"predicate_filter": predicate,
"predicate_filter": predicate._to_dict()
if predicate is not None
else None,
"true_mutations": true_case_dict,
"false_mutations": false_case_dict,
"table_name": self.table_name,
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigtable/data/_async/mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync
from google.cloud.bigtable.data._async._mutate_rows import (
MUTATE_ROWS_REQUEST_MUTATION_LIMIT,
_MUTATE_ROWS_REQUEST_MUTATION_LIMIT,
)
from google.cloud.bigtable.data.mutations import Mutation

Expand Down Expand Up @@ -144,7 +144,7 @@ async def add_to_flow(self, mutations: RowMutationEntry | list[RowMutationEntry]
self._has_capacity(next_count, next_size)
# make sure not to exceed per-request mutation count limits
and (batch_mutation_count + next_count)
<= MUTATE_ROWS_REQUEST_MUTATION_LIMIT
<= _MUTATE_ROWS_REQUEST_MUTATION_LIMIT
):
# room for new mutation; add to batch
end_idx += 1
Expand Down
20 changes: 10 additions & 10 deletions google/cloud/bigtable/data/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
from abc import ABC, abstractmethod
from sys import getsizeof

from google.cloud.bigtable.data.read_modify_write_rules import MAX_INCREMENT_VALUE

# special value for SetCell mutation timestamps. If set, server will assign a timestamp
SERVER_SIDE_TIMESTAMP = -1
from google.cloud.bigtable.data.read_modify_write_rules import _MAX_INCREMENT_VALUE

# special value for SetCell mutation timestamps. If set, server will assign a timestamp
_SERVER_SIDE_TIMESTAMP = -1

# mutation entries above this should be rejected
MUTATE_ROWS_REQUEST_MUTATION_LIMIT = 100_000
_MUTATE_ROWS_REQUEST_MUTATION_LIMIT = 100_000


class Mutation(ABC):
Expand Down Expand Up @@ -112,7 +112,7 @@ def __init__(
if isinstance(new_value, str):
new_value = new_value.encode()
elif isinstance(new_value, int):
if abs(new_value) > MAX_INCREMENT_VALUE:
if abs(new_value) > _MAX_INCREMENT_VALUE:
raise ValueError(
"int values must be between -2**63 and 2**63 (64-bit signed int)"
)
Expand All @@ -123,9 +123,9 @@ def __init__(
# use current timestamp, with milisecond precision
timestamp_micros = time.time_ns() // 1000
timestamp_micros = timestamp_micros - (timestamp_micros % 1000)
if timestamp_micros < SERVER_SIDE_TIMESTAMP:
if timestamp_micros < _SERVER_SIDE_TIMESTAMP:
raise ValueError(
"timestamp_micros must be positive (or -1 for server-side timestamp)"
f"timestamp_micros must be positive (or {_SERVER_SIDE_TIMESTAMP} for server-side timestamp)"
)
self.family = family
self.qualifier = qualifier
Expand All @@ -145,7 +145,7 @@ def _to_dict(self) -> dict[str, Any]:

def is_idempotent(self) -> bool:
"""Check if the mutation is idempotent"""
return self.timestamp_micros != SERVER_SIDE_TIMESTAMP
return self.timestamp_micros != _SERVER_SIDE_TIMESTAMP


@dataclass
Expand Down Expand Up @@ -208,9 +208,9 @@ def __init__(self, row_key: bytes | str, mutations: Mutation | list[Mutation]):
mutations = [mutations]
if len(mutations) == 0:
raise ValueError("mutations must not be empty")
elif len(mutations) > MUTATE_ROWS_REQUEST_MUTATION_LIMIT:
elif len(mutations) > _MUTATE_ROWS_REQUEST_MUTATION_LIMIT:
raise ValueError(
f"entries must have <= {MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations"
f"entries must have <= {_MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations"
)
self.row_key = row_key
self.mutations = tuple(mutations)
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigtable/data/read_modify_write_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import abc

# value must fit in 64-bit signed integer
MAX_INCREMENT_VALUE = (1 << 63) - 1
_MAX_INCREMENT_VALUE = (1 << 63) - 1


class ReadModifyWriteRule(abc.ABC):
Expand All @@ -37,7 +37,7 @@ class IncrementRule(ReadModifyWriteRule):
def __init__(self, family: str, qualifier: bytes | str, increment_amount: int = 1):
if not isinstance(increment_amount, int):
raise TypeError("increment_amount must be an integer")
if abs(increment_amount) > MAX_INCREMENT_VALUE:
if abs(increment_amount) > _MAX_INCREMENT_VALUE:
raise ValueError(
"increment_amount must be between -2**63 and 2**63 (64-bit signed int)"
)
Expand Down
Loading

0 comments on commit a8cdf7c

Please sign in to comment.