Skip to content

Commit

Permalink
Merge branch 'main' into owl-bot-update-lock-230f7fe8a0d2ed81a519cfc1…
Browse files Browse the repository at this point in the history
…5c6bb11c5b46b9fb449b8b1219b3771bcb520ad2
  • Loading branch information
daniel-sanche authored Dec 12, 2023
2 parents ab1fd18 + e4e63c7 commit 97b5cfe
Show file tree
Hide file tree
Showing 29 changed files with 143 additions and 87 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/system_emulated.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
jobs:

run-systests:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04

steps:

Expand All @@ -20,7 +20,7 @@ jobs:
python-version: '3.8'

- name: Setup GCloud SDK
uses: google-github-actions/setup-gcloud@v1.1.0
uses: google-github-actions/setup-gcloud@v1.1.1

- name: Install / run Nox
run: |
Expand Down
129 changes: 74 additions & 55 deletions google/cloud/bigtable/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,19 @@ def __init__(self, max_mutation_bytes=MAX_MUTATION_SIZE, flush_count=FLUSH_COUNT
self.flush_count = flush_count

def get(self):
"""Retrieve an item from the queue. Recalculate queue size."""
row = self._queue.get()
mutation_size = row.get_mutations_size()
self.total_mutation_count -= len(row._get_mutations())
self.total_size -= mutation_size
return row
"""
Retrieve an item from the queue. Recalculate queue size.
If the queue is empty, return None.
"""
try:
row = self._queue.get_nowait()
mutation_size = row.get_mutations_size()
self.total_mutation_count -= len(row._get_mutations())
self.total_size -= mutation_size
return row
except queue.Empty:
return None

def put(self, item):
"""Insert an item to the queue. Recalculate queue size."""
Expand All @@ -79,9 +86,6 @@ def full(self):
return True
return False

def empty(self):
return self._queue.empty()


@dataclass
class _BatchInfo:
Expand Down Expand Up @@ -110,6 +114,7 @@ def __init__(
self.inflight_size = 0
self.event = threading.Event()
self.event.set()
self._lock = threading.Lock()

def is_blocked(self):
"""Returns True if:
Expand All @@ -128,8 +133,9 @@ def control_flow(self, batch_info):
Calculate the resources used by this batch
"""

self.inflight_mutations += batch_info.mutations_count
self.inflight_size += batch_info.mutations_size
with self._lock:
self.inflight_mutations += batch_info.mutations_count
self.inflight_size += batch_info.mutations_size
self.set_flow_control_status()

def wait(self):
Expand All @@ -154,8 +160,9 @@ def release(self, batch_info):
Release the resources.
Decrement the row size to allow enqueued mutations to be run.
"""
self.inflight_mutations -= batch_info.mutations_count
self.inflight_size -= batch_info.mutations_size
with self._lock:
self.inflight_mutations -= batch_info.mutations_count
self.inflight_size -= batch_info.mutations_size
self.set_flow_control_status()


Expand Down Expand Up @@ -292,8 +299,10 @@ def flush(self):
* :exc:`.batcherMutationsBatchError` if there's any error in the mutations.
"""
rows_to_flush = []
while not self._rows.empty():
rows_to_flush.append(self._rows.get())
row = self._rows.get()
while row is not None:
rows_to_flush.append(row)
row = self._rows.get()
response = self._flush_rows(rows_to_flush)
return response

Expand All @@ -303,58 +312,68 @@ def _flush_async(self):
:raises:
* :exc:`.batcherMutationsBatchError` if there's any error in the mutations.
"""

rows_to_flush = []
mutations_count = 0
mutations_size = 0
rows_count = 0
batch_info = _BatchInfo()

while not self._rows.empty():
row = self._rows.get()
mutations_count += len(row._get_mutations())
mutations_size += row.get_mutations_size()
rows_count += 1
rows_to_flush.append(row)
batch_info.mutations_count = mutations_count
batch_info.rows_count = rows_count
batch_info.mutations_size = mutations_size

if (
rows_count >= self.flush_count
or mutations_size >= self.max_row_bytes
or mutations_count >= self.flow_control.max_mutations
or mutations_size >= self.flow_control.max_mutation_bytes
or self._rows.empty() # submit when it reached the end of the queue
next_row = self._rows.get()
while next_row is not None:
# start a new batch
rows_to_flush = [next_row]
batch_info = _BatchInfo(
mutations_count=len(next_row._get_mutations()),
rows_count=1,
mutations_size=next_row.get_mutations_size(),
)
# fill up batch with rows
next_row = self._rows.get()
while next_row is not None and self._row_fits_in_batch(
next_row, batch_info
):
# wait for resources to become available, before submitting any new batch
self.flow_control.wait()
# once unblocked, submit a batch
# event flag will be set by control_flow to block subsequent thread, but not blocking this one
self.flow_control.control_flow(batch_info)
future = self._executor.submit(self._flush_rows, rows_to_flush)
self.futures_mapping[future] = batch_info
future.add_done_callback(self._batch_completed_callback)

# reset and start a new batch
rows_to_flush = []
mutations_size = 0
rows_count = 0
mutations_count = 0
batch_info = _BatchInfo()
rows_to_flush.append(next_row)
batch_info.mutations_count += len(next_row._get_mutations())
batch_info.rows_count += 1
batch_info.mutations_size += next_row.get_mutations_size()
next_row = self._rows.get()
# send batch over network
# wait for resources to become available
self.flow_control.wait()
# once unblocked, submit the batch
# event flag will be set by control_flow to block subsequent thread, but not blocking this one
self.flow_control.control_flow(batch_info)
future = self._executor.submit(self._flush_rows, rows_to_flush)
# schedule release of resources from flow control
self.futures_mapping[future] = batch_info
future.add_done_callback(self._batch_completed_callback)

def _batch_completed_callback(self, future):
"""Callback for when the mutation has finished to clean up the current batch
and release items from the flow controller.
Raise exceptions if there's any.
Release the resources locked by the flow control and allow enqueued tasks to be run.
"""

processed_rows = self.futures_mapping[future]
self.flow_control.release(processed_rows)
del self.futures_mapping[future]

def _row_fits_in_batch(self, row, batch_info):
"""Checks if a row can fit in the current batch.
:type row: class
:param row: :class:`~google.cloud.bigtable.row.DirectRow`.
:type batch_info: :class:`_BatchInfo`
:param batch_info: Information about the current batch.
:rtype: bool
:returns: True if the row can fit in the current batch.
"""
new_rows_count = batch_info.rows_count + 1
new_mutations_count = batch_info.mutations_count + len(row._get_mutations())
new_mutations_size = batch_info.mutations_size + row.get_mutations_size()
return (
new_rows_count <= self.flush_count
and new_mutations_size <= self.max_row_bytes
and new_mutations_count <= self.flow_control.max_mutations
and new_mutations_size <= self.flow_control.max_mutation_bytes
)

def _flush_rows(self, rows_to_flush):
"""Mutate the specified rows.
Expand Down
2 changes: 1 addition & 1 deletion samples/beam/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pytest==7.3.1
pytest==7.4.0
2 changes: 1 addition & 1 deletion samples/beam/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
apache-beam==2.46.0
google-cloud-bigtable==2.17.0
google-cloud-core==2.3.2
google-cloud-core==2.3.3
2 changes: 1 addition & 1 deletion samples/hello/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pytest==7.3.1
pytest==7.4.0
4 changes: 2 additions & 2 deletions samples/hello/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
google-cloud-bigtable==2.17.0
google-cloud-core==2.3.2
google-cloud-bigtable==2.20.0
google-cloud-core==2.3.3
2 changes: 1 addition & 1 deletion samples/hello_happybase/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pytest==7.3.1
pytest==7.4.0
2 changes: 1 addition & 1 deletion samples/instanceadmin/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pytest==7.3.1
pytest==7.4.0
2 changes: 1 addition & 1 deletion samples/instanceadmin/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
google-cloud-bigtable==2.17.0
google-cloud-bigtable==2.20.0
backoff==2.2.1
4 changes: 2 additions & 2 deletions samples/metricscaler/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pytest==7.3.1
mock==5.0.2
pytest==7.4.0
mock==5.1.0
google-cloud-testutils
4 changes: 2 additions & 2 deletions samples/metricscaler/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
google-cloud-bigtable==2.17.0
google-cloud-monitoring==2.14.2
google-cloud-bigtable==2.20.0
google-cloud-monitoring==2.15.1
2 changes: 1 addition & 1 deletion samples/quickstart/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pytest==7.3.1
pytest==7.4.0
2 changes: 1 addition & 1 deletion samples/quickstart/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
google-cloud-bigtable==2.17.0
google-cloud-bigtable==2.20.0
2 changes: 1 addition & 1 deletion samples/quickstart_happybase/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pytest==7.3.1
pytest==7.4.0
2 changes: 1 addition & 1 deletion samples/snippets/deletes/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pytest==7.3.1
pytest==7.4.0
2 changes: 1 addition & 1 deletion samples/snippets/deletes/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
google-cloud-bigtable==2.17.0
google-cloud-bigtable==2.20.0
snapshottest==0.6.0
2 changes: 1 addition & 1 deletion samples/snippets/filters/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pytest==7.3.1
pytest==7.4.0
2 changes: 1 addition & 1 deletion samples/snippets/filters/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
google-cloud-bigtable==2.17.0
google-cloud-bigtable==2.20.0
snapshottest==0.6.0
2 changes: 1 addition & 1 deletion samples/snippets/reads/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pytest==7.3.1
pytest==7.4.0
2 changes: 1 addition & 1 deletion samples/snippets/reads/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
google-cloud-bigtable==2.17.0
google-cloud-bigtable==2.20.0
snapshottest==0.6.0
2 changes: 1 addition & 1 deletion samples/snippets/writes/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
backoff==2.2.1
pytest==7.3.1
pytest==7.4.0
2 changes: 1 addition & 1 deletion samples/snippets/writes/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
google-cloud-bigtable==2.17.0
google-cloud-bigtable==2.20.0
2 changes: 1 addition & 1 deletion samples/tableadmin/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pytest==7.3.1
pytest==7.4.0
google-cloud-testutils==1.3.3
2 changes: 1 addition & 1 deletion samples/tableadmin/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
google-cloud-bigtable==2.17.0
google-cloud-bigtable==2.20.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Operating System :: OS Independent",
"Topic :: Internet",
],
Expand Down
Empty file added testing/constraints-3.12.txt
Empty file.
2 changes: 1 addition & 1 deletion tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def location_id():

@pytest.fixture(scope="session")
def serve_nodes():
return 3
return 1


@pytest.fixture(scope="session")
Expand Down
36 changes: 36 additions & 0 deletions tests/system/test_data_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,39 @@ def test_access_with_non_admin_client(data_client, data_instance_id, data_table_
instance = data_client.instance(data_instance_id)
table = instance.table(data_table_id)
assert table.read_row("nonesuch") is None # no raise


def test_mutations_batcher_threading(data_table, rows_to_delete):
"""
Test the mutations batcher by sending a bunch of mutations using different
flush methods
"""
import mock
import time
from google.cloud.bigtable.batcher import MutationsBatcher

num_sent = 20
all_results = []

def callback(results):
all_results.extend(results)

# override flow control max elements
with mock.patch("google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", 2):
with MutationsBatcher(
data_table,
flush_count=5,
flush_interval=0.07,
batch_completed_callback=callback,
) as batcher:
# send mutations in a way that timed flushes and count flushes interleave
for i in range(num_sent):
row = data_table.direct_row("row{}".format(i))
row.set_cell(
COLUMN_FAMILY_ID1, COL_NAME1, "val{}".format(i).encode("utf-8")
)
rows_to_delete.append(row)
batcher.mutate(row)
time.sleep(0.01)
# ensure all mutations were sent
assert len(all_results) == num_sent
8 changes: 4 additions & 4 deletions tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1067,8 +1067,8 @@ def test_table_yield_retry_rows():
for row in table.yield_rows(start_key=ROW_KEY_1, end_key=ROW_KEY_2):
rows.append(row)

assert len(warned) == 1
assert warned[0].category is DeprecationWarning
assert len(warned) >= 1
assert DeprecationWarning in [w.category for w in warned]

result = rows[1]
assert result.row_key == ROW_KEY_2
Expand Down Expand Up @@ -1140,8 +1140,8 @@ def test_table_yield_rows_with_row_set():
for row in table.yield_rows(row_set=row_set):
rows.append(row)

assert len(warned) == 1
assert warned[0].category is DeprecationWarning
assert len(warned) >= 1
assert DeprecationWarning in [w.category for w in warned]

assert rows[0].row_key == ROW_KEY_1
assert rows[1].row_key == ROW_KEY_2
Expand Down

0 comments on commit 97b5cfe

Please sign in to comment.