From c71ec70e55f6e236e46127870a9ed4717eee5da5 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Wed, 17 Aug 2022 16:04:52 -0400 Subject: [PATCH] perf: optimize row merging (#628) This PR rewrites the row merging logic to be more correct and improve performance: - extract row merging logic into its own class to simplify complexity of ReadRows handling - Use OrderedDict instead of dict() for `{family: { qualifier: [] }}` data, this should maintain serverside ordering (family in creation order and qualifier in lexiographical). - define an explicit state machine with states implemented as methods - add various optimizations like: - __slots__ on hot objects to avoid dict lookups - avoiding dict lookups for contiguous family and qualifier keys Overall this improves performance by 20% and in my opinion is a lot more readable --- google/cloud/bigtable/row_data.py | 206 +++++------------------ google/cloud/bigtable/row_merger.py | 250 ++++++++++++++++++++++++++++ tests/unit/test_row_data.py | 141 +++------------- tests/unit/test_row_merger.py | 152 +++++++++++++++++ 4 files changed, 470 insertions(+), 279 deletions(-) create mode 100644 google/cloud/bigtable/row_merger.py diff --git a/google/cloud/bigtable/row_data.py b/google/cloud/bigtable/row_data.py index e7d3d5bd4..a50fab1ee 100644 --- a/google/cloud/bigtable/row_data.py +++ b/google/cloud/bigtable/row_data.py @@ -18,45 +18,25 @@ import copy import grpc # type: ignore - +import warnings from google.api_core import exceptions from google.api_core import retry from google.cloud._helpers import _to_bytes # type: ignore + +from google.cloud.bigtable.row_merger import _RowMerger, _State from google.cloud.bigtable_v2.types import bigtable as data_messages_v2_pb2 from google.cloud.bigtable_v2.types import data as data_v2_pb2 from google.cloud.bigtable.row import Cell, InvalidChunk, PartialRowData + # Some classes need to be re-exported here to keep backwards # compatibility. Those classes were moved to row_merger, but we dont want to # break enduser's imports. This hack, ensures they don't get marked as unused. _ = (Cell, InvalidChunk, PartialRowData) -class PartialCellData(object): - """Representation of partial cell in a Google Cloud Bigtable Table. - - These are expected to be updated directly from a - :class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse` - - :type row_key: bytes - :param row_key: The key for the row holding the (partial) cell. - - :type family_name: str - :param family_name: The family name of the (partial) cell. - - :type qualifier: bytes - :param qualifier: The column qualifier of the (partial) cell. - - :type timestamp_micros: int - :param timestamp_micros: The timestamp (in microsecods) of the - (partial) cell. - - :type labels: list of str - :param labels: labels assigned to the (partial) cell - - :type value: bytes - :param value: The (accumulated) value of the (partial) cell. - """ +class PartialCellData(object): # pragma: NO COVER + """This class is no longer used and will be removed in the future""" def __init__( self, row_key, family_name, qualifier, timestamp_micros, labels=(), value=b"" @@ -69,11 +49,6 @@ def __init__( self.value = value def append_value(self, value): - """Append bytes from a new chunk to value. - - :type value: bytes - :param value: bytes to append - """ self.value += value @@ -168,14 +143,7 @@ class PartialRowsData(object): def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS): # Counter for rows returned to the user self._counter = 0 - # In-progress row, unset until first response, after commit/reset - self._row = None - # Last complete row, unset until first commit - self._previous_row = None - # In-progress cell, unset until first response, after completion - self._cell = None - # Last complete cell, unset until first completion, after new row - self._previous_cell = None + self._row_merger = _RowMerger() # May be cached from previous response self.last_scanned_row_key = None @@ -192,20 +160,35 @@ def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS): self.response_iterator = read_method(request, timeout=self.retry._deadline + 1) self.rows = {} - self._state = self.STATE_NEW_ROW # Flag to stop iteration, for any reason not related to self.retry() self._cancelled = False @property - def state(self): - """State machine state. - - :rtype: str - :returns: name of state corresponding to current row / chunk - processing. + def state(self): # pragma: NO COVER + """ + DEPRECATED: this property is deprecated and will be removed in the + future. """ - return self.read_states[self._state] + warnings.warn( + "`PartialRowsData#state()` is deprecated and will be removed in the future", + DeprecationWarning, + stacklevel=2, + ) + + # Best effort: try to map internal RowMerger states to old strings for + # backwards compatibility + internal_state = self._row_merger.state + if internal_state == _State.ROW_START: + return self.NEW_ROW + # note: _State.CELL_START, _State.CELL_COMPLETE are transient states + # and will not be visible in between chunks + elif internal_state == _State.CELL_IN_PROGRESS: + return self.CELL_IN_PROGRESS + elif internal_state == _State.ROW_COMPLETE: + return self.NEW_ROW + else: + raise RuntimeError("unexpected internal state: " + self._) def cancel(self): """Cancels the iterator, closing the stream.""" @@ -241,6 +224,7 @@ def _on_error(self, exc): if self.last_scanned_row_key: retry_request = self._create_retry_request() + self._row_merger = _RowMerger(self._row_merger.last_seen_row_key) self.response_iterator = self.read_method(retry_request) def _read_next(self): @@ -266,125 +250,23 @@ def __iter__(self): try: response = self._read_next_response() except StopIteration: - if self.state != self.NEW_ROW: - raise ValueError("The row remains partial / is not committed.") + self._row_merger.finalize() break except InvalidRetryRequest: self._cancelled = True break - for chunk in response.chunks: + for row in self._row_merger.process_chunks(response): + self.last_scanned_row_key = self._row_merger.last_seen_row_key + self._counter += 1 + + yield row + if self._cancelled: break - self._process_chunk(chunk) - if chunk.commit_row: - self.last_scanned_row_key = self._previous_row.row_key - self._counter += 1 - yield self._previous_row - - resp_last_key = response.last_scanned_row_key - if resp_last_key and resp_last_key > self.last_scanned_row_key: - self.last_scanned_row_key = resp_last_key - - def _process_chunk(self, chunk): - if chunk.reset_row: - self._validate_chunk_reset_row(chunk) - self._row = None - self._cell = self._previous_cell = None - self._state = self.STATE_NEW_ROW - return - - self._update_cell(chunk) - - if self._row is None: - if ( - self._previous_row is not None - and self._cell.row_key <= self._previous_row.row_key - ): - raise InvalidChunk() - self._row = PartialRowData(self._cell.row_key) - - if chunk.value_size == 0: - self._state = self.STATE_ROW_IN_PROGRESS - self._save_current_cell() - else: - self._state = self.STATE_CELL_IN_PROGRESS - - if chunk.commit_row: - if chunk.value_size > 0: - raise InvalidChunk() - - self._previous_row = self._row - self._row = None - self._previous_cell = None - self._state = self.STATE_NEW_ROW - - def _update_cell(self, chunk): - if self._cell is None: - qualifier = None - if chunk.HasField("qualifier"): - qualifier = chunk.qualifier.value - - family = None - if chunk.HasField("family_name"): - family = chunk.family_name.value - - self._cell = PartialCellData( - chunk.row_key, - family, - qualifier, - chunk.timestamp_micros, - chunk.labels, - chunk.value, - ) - self._copy_from_previous(self._cell) - self._validate_cell_data_new_cell() - else: - self._cell.append_value(chunk.value) - - def _validate_cell_data_new_cell(self): - cell = self._cell - if not cell.row_key or not cell.family_name or cell.qualifier is None: - raise InvalidChunk() - - prev = self._previous_cell - if prev and prev.row_key != cell.row_key: - raise InvalidChunk() - - def _validate_chunk_reset_row(self, chunk): - # No reset for new row - _raise_if(self._state == self.STATE_NEW_ROW) - - # No reset with other keys - _raise_if(chunk.row_key) - _raise_if(chunk.HasField("family_name")) - _raise_if(chunk.HasField("qualifier")) - _raise_if(chunk.timestamp_micros) - _raise_if(chunk.labels) - _raise_if(chunk.value_size) - _raise_if(chunk.value) - _raise_if(chunk.commit_row) - - def _save_current_cell(self): - """Helper for :meth:`consume_next`.""" - row, cell = self._row, self._cell - family = row._cells.setdefault(cell.family_name, {}) - qualified = family.setdefault(cell.qualifier, []) - complete = Cell.from_pb(cell) - qualified.append(complete) - self._cell, self._previous_cell = None, cell - - def _copy_from_previous(self, cell): - """Helper for :meth:`consume_next`.""" - previous = self._previous_cell - if previous is not None: - if not cell.row_key: - cell.row_key = previous.row_key - if not cell.family_name: - cell.family_name = previous.family_name - # NOTE: ``cell.qualifier`` **can** be empty string. - if cell.qualifier is None: - cell.qualifier = previous.qualifier + # The last response might not have generated any rows, but it + # could've updated last_scanned_row_key + self.last_scanned_row_key = self._row_merger.last_seen_row_key class _ReadRowsRequestManager(object): @@ -494,9 +376,3 @@ def _start_key_set(row_range): def _end_key_set(row_range): """Helper for :meth:`_filter_row_ranges`""" return row_range.end_key_open or row_range.end_key_closed - - -def _raise_if(predicate, *args): - """Helper for validation methods.""" - if predicate: - raise InvalidChunk(*args) diff --git a/google/cloud/bigtable/row_merger.py b/google/cloud/bigtable/row_merger.py new file mode 100644 index 000000000..515b91df7 --- /dev/null +++ b/google/cloud/bigtable/row_merger.py @@ -0,0 +1,250 @@ +from enum import Enum +from collections import OrderedDict +from google.cloud.bigtable.row import Cell, PartialRowData, InvalidChunk + +_MISSING_COLUMN_FAMILY = "Column family {} is not among the cells stored in this row." +_MISSING_COLUMN = ( + "Column {} is not among the cells stored in this row in the column family {}." +) +_MISSING_INDEX = ( + "Index {!r} is not valid for the cells stored in this row for column {} " + "in the column family {}. There are {} such cells." +) + + +class _State(Enum): + ROW_START = "ROW_START" + CELL_START = "CELL_START" + CELL_IN_PROGRESS = "CELL_IN_PROGRESS" + CELL_COMPLETE = "CELL_COMPLETE" + ROW_COMPLETE = "ROW_COMPLETE" + + +class _PartialRow(object): + __slots__ = [ + "row_key", + "cells", + "last_family", + "last_family_cells", + "last_qualifier", + "last_qualifier_cells", + "cell", + ] + + def __init__(self, row_key): + self.row_key = row_key + self.cells = OrderedDict() + + self.last_family = None + self.last_family_cells = OrderedDict() + self.last_qualifier = None + self.last_qualifier_cells = [] + + self.cell = None + + +class _PartialCell(object): + __slots__ = ["family", "qualifier", "timestamp", "labels", "value", "value_index"] + + def __init__(self): + self.family = None + self.qualifier = None + self.timestamp = None + self.labels = None + self.value = None + self.value_index = 0 + + +class _RowMerger(object): + """ + State machine to merge chunks from a response stream into logical rows. + + The implementation is a fairly linear state machine that is implemented as + a method for every state in the _State enum. In general the states flow + from top to bottom with some repetition. Each state handler will do some + sanity checks, update in progress data and set the next state. + + There can be multiple state transitions for each chunk, i.e. a single chunk + row will flow from ROW_START -> CELL_START -> CELL_COMPLETE -> ROW_COMPLETE + in a single iteration. + """ + + __slots__ = ["state", "last_seen_row_key", "row"] + + def __init__(self, last_seen_row=b""): + self.last_seen_row_key = last_seen_row + self.state = _State.ROW_START + self.row = None + + def process_chunks(self, response): + """ + Process the chunks in the given response and yield logical rows. + This class will maintain state across multiple response protos. + """ + if response.last_scanned_row_key: + if self.last_seen_row_key >= response.last_scanned_row_key: + raise InvalidChunk("Last scanned row key is out of order") + self.last_seen_row_key = response.last_scanned_row_key + + for chunk in response.chunks: + if chunk.reset_row: + self._handle_reset(chunk) + continue + + if self.state == _State.ROW_START: + self._handle_row_start(chunk) + + if self.state == _State.CELL_START: + self._handle_cell_start(chunk) + + if self.state == _State.CELL_IN_PROGRESS: + self._handle_cell_in_progress(chunk) + + if self.state == _State.CELL_COMPLETE: + self._handle_cell_complete(chunk) + + if self.state == _State.ROW_COMPLETE: + yield self._handle_row_complete(chunk) + elif chunk.commit_row: + raise InvalidChunk( + f"Chunk tried to commit row in wrong state (${self.state})" + ) + + def _handle_reset(self, chunk): + if self.state == _State.ROW_START: + raise InvalidChunk("Bare reset") + if chunk.row_key: + raise InvalidChunk("Reset chunk has a row key") + if chunk.HasField("family_name"): + raise InvalidChunk("Reset chunk has family_name") + if chunk.HasField("qualifier"): + raise InvalidChunk("Reset chunk has qualifier") + if chunk.timestamp_micros: + raise InvalidChunk("Reset chunk has a timestamp") + if chunk.labels: + raise InvalidChunk("Reset chunk has labels") + if chunk.value: + raise InvalidChunk("Reset chunk has a value") + + self.state = _State.ROW_START + self.row = None + + def _handle_row_start(self, chunk): + if not chunk.row_key: + raise InvalidChunk("New row is missing a row key") + if self.last_seen_row_key and self.last_seen_row_key >= chunk.row_key: + raise InvalidChunk("Out of order row keys") + + self.row = _PartialRow(chunk.row_key) + self.state = _State.CELL_START + + def _handle_cell_start(self, chunk): + # Ensure that all chunks after the first one either are missing a row + # key or the row is the same + if self.row.cells and chunk.row_key and chunk.row_key != self.row.row_key: + raise InvalidChunk("row key changed mid row") + + if not self.row.cell: + self.row.cell = _PartialCell() + + # Cells can inherit family/qualifier from previous cells + # However if the family changes, then qualifier must be specified as well + if chunk.HasField("family_name"): + self.row.cell.family = chunk.family_name.value + self.row.cell.qualifier = None + if not self.row.cell.family: + raise InvalidChunk("missing family for a new cell") + + if chunk.HasField("qualifier"): + self.row.cell.qualifier = chunk.qualifier.value + if self.row.cell.qualifier is None: + raise InvalidChunk("missing qualifier for a new cell") + + self.row.cell.timestamp = chunk.timestamp_micros + self.row.cell.labels = chunk.labels + + if chunk.value_size > 0: + # explicitly avoid pre-allocation as it seems that bytearray + # concatenation performs better than slice copies. + self.row.cell.value = bytearray() + self.state = _State.CELL_IN_PROGRESS + else: + self.row.cell.value = chunk.value + self.state = _State.CELL_COMPLETE + + def _handle_cell_in_progress(self, chunk): + # if this isn't the first cell chunk, make sure that everything except + # the value stayed constant. + if self.row.cell.value_index > 0: + if chunk.row_key: + raise InvalidChunk("found row key mid cell") + if chunk.HasField("family_name"): + raise InvalidChunk("In progress cell had a family name") + if chunk.HasField("qualifier"): + raise InvalidChunk("In progress cell had a qualifier") + if chunk.timestamp_micros: + raise InvalidChunk("In progress cell had a timestamp") + if chunk.labels: + raise InvalidChunk("In progress cell had labels") + + self.row.cell.value += chunk.value + self.row.cell.value_index += len(chunk.value) + + if chunk.value_size > 0: + self.state = _State.CELL_IN_PROGRESS + else: + self.row.cell.value = bytes(self.row.cell.value) + self.state = _State.CELL_COMPLETE + + def _handle_cell_complete(self, chunk): + # since we are guaranteed that all family & qualifier cells are + # contiguous, we can optimize away the dict lookup by caching the last + # family/qualifier and simply comparing and appending + family_changed = False + if self.row.last_family != self.row.cell.family: + family_changed = True + self.row.last_family = self.row.cell.family + self.row.cells[ + self.row.cell.family + ] = self.row.last_family_cells = OrderedDict() + + if family_changed or self.row.last_qualifier != self.row.cell.qualifier: + self.row.last_qualifier = self.row.cell.qualifier + self.row.last_family_cells[ + self.row.cell.qualifier + ] = self.row.last_qualifier_cells = [] + + self.row.last_qualifier_cells.append( + Cell( + self.row.cell.value, + self.row.cell.timestamp, + self.row.cell.labels, + ) + ) + + self.row.cell.timestamp = 0 + self.row.cell.value = None + self.row.cell.value_index = 0 + + if not chunk.commit_row: + self.state = _State.CELL_START + else: + self.state = _State.ROW_COMPLETE + + def _handle_row_complete(self, chunk): + new_row = PartialRowData(self.row.row_key) + new_row._cells = self.row.cells + + self.last_seen_row_key = new_row.row_key + self.row = None + self.state = _State.ROW_START + + return new_row + + def finalize(self): + """ + Must be called at the end of the stream to ensure there are no unmerged + rows. + """ + if self.row or self.state != _State.ROW_START: + raise ValueError("The row remains partial / is not committed.") diff --git a/tests/unit/test_row_data.py b/tests/unit/test_row_data.py index 9175bf479..382a81ef1 100644 --- a/tests/unit/test_row_data.py +++ b/tests/unit/test_row_data.py @@ -630,78 +630,6 @@ def test_partial_rows_data_cancel_between_chunks(): assert list(yrd) == [] -# 'consume_next' tested via 'TestPartialRowsData_JSON_acceptance_tests' - - -def test_partial_rows_data__copy_from_previous_unset(): - client = _Client() - client._data_stub = mock.MagicMock() - request = object() - yrd = _make_partial_rows_data(client._data_stub.read_rows, request) - cell = _PartialCellData() - yrd._copy_from_previous(cell) - assert cell.row_key == b"" - assert cell.family_name == "" - assert cell.qualifier is None - assert cell.timestamp_micros == 0 - assert cell.labels == [] - - -def test_partial_rows_data__copy_from_previous_blank(): - ROW_KEY = "RK" - FAMILY_NAME = "A" - QUALIFIER = b"C" - TIMESTAMP_MICROS = 100 - LABELS = ["L1", "L2"] - client = _Client() - client._data_stub = mock.MagicMock() - request = object() - yrd = _make_partial_rows_data(client._data_stub.ReadRows, request) - cell = _PartialCellData( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - labels=LABELS, - ) - yrd._previous_cell = _PartialCellData() - yrd._copy_from_previous(cell) - assert cell.row_key == ROW_KEY - assert cell.family_name == FAMILY_NAME - assert cell.qualifier == QUALIFIER - assert cell.timestamp_micros == TIMESTAMP_MICROS - assert cell.labels == LABELS - - -def test_partial_rows_data__copy_from_previous_filled(): - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - - ROW_KEY = "RK" - FAMILY_NAME = "A" - QUALIFIER = b"C" - TIMESTAMP_MICROS = 100 - LABELS = ["L1", "L2"] - client = _Client() - data_api = mock.create_autospec(BigtableClient) - client._data_stub = data_api - request = object() - yrd = _make_partial_rows_data(client._data_stub.read_rows, request) - yrd._previous_cell = _PartialCellData( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - labels=LABELS, - ) - cell = _PartialCellData() - yrd._copy_from_previous(cell) - assert cell.row_key == ROW_KEY - assert cell.family_name == FAMILY_NAME - assert cell.qualifier == QUALIFIER - assert cell.timestamp_micros == 0 - assert cell.labels == [] - - def test_partial_rows_data_valid_last_scanned_row_key_on_start(): client = _Client() response = _ReadRowsResponseV2([], last_scanned_row_key=b"2.AFTER") @@ -732,38 +660,36 @@ def test_partial_rows_data_invalid_empty_chunk(): def test_partial_rows_data_state_cell_in_progress(): - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - from google.cloud.bigtable_v2.types import bigtable as messages_v2_pb2 - - LABELS = ["L1", "L2"] - - request = object() - client = _Client() - client._data_stub = mock.create_autospec(BigtableClient) - yrd = _make_partial_rows_data(client._data_stub.read_rows, request) - - chunk = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - labels=LABELS, + labels = ["L1", "L2"] + resp = _ReadRowsResponseV2( + [ + _ReadRowsResponseCellChunkPB( + row_key=ROW_KEY, + family_name=FAMILY_NAME, + qualifier=QUALIFIER, + timestamp_micros=TIMESTAMP_MICROS, + value=VALUE, + value_size=(2 * len(VALUE)), + labels=labels, + ), + _ReadRowsResponseCellChunkPB(value=VALUE, commit_row=True), + ] ) - # _update_cell expects to be called after the protoplus wrapper has been - # shucked - chunk = messages_v2_pb2.ReadRowsResponse.CellChunk.pb(chunk) - yrd._update_cell(chunk) - more_cell_data = _ReadRowsResponseCellChunkPB(value=VALUE) - yrd._update_cell(more_cell_data) + def fake_read(*args, **kwargs): + return iter([resp]) + + yrd = _make_partial_rows_data(fake_read, None) + yrd.consume_all() - assert yrd._cell.row_key == ROW_KEY - assert yrd._cell.family_name == FAMILY_NAME - assert yrd._cell.qualifier == QUALIFIER - assert yrd._cell.timestamp_micros == TIMESTAMP_MICROS - assert yrd._cell.labels == LABELS - assert yrd._cell.value == VALUE + VALUE + expected_row = _make_partial_row_data(ROW_KEY) + expected_row._cells = { + QUALIFIER: [ + _make_cell( + value=(VALUE + VALUE), timestamp_micros=TIMESTAMP_MICROS, labels=labels + ) + ] + } def test_partial_rows_data_yield_rows_data(): @@ -1215,19 +1141,6 @@ def next(self): __next__ = next -class _PartialCellData(object): - - row_key = b"" - family_name = "" - qualifier = None - timestamp_micros = 0 - last_scanned_row_key = "" - - def __init__(self, **kw): - self.labels = kw.pop("labels", []) - self.__dict__.update(kw) - - def _ReadRowsResponseV2(chunks, last_scanned_row_key=b""): from google.cloud.bigtable_v2.types import bigtable as messages_v2_pb2 diff --git a/tests/unit/test_row_merger.py b/tests/unit/test_row_merger.py index f336a82ff..483c04536 100644 --- a/tests/unit/test_row_merger.py +++ b/tests/unit/test_row_merger.py @@ -7,6 +7,7 @@ from google.cloud.bigtable.row_data import PartialRowsData, PartialRowData, InvalidChunk from google.cloud.bigtable_v2.types.bigtable import ReadRowsResponse +from google.cloud.bigtable.row_merger import _RowMerger # TODO: autogenerate protos from @@ -76,3 +77,154 @@ def fake_read(*args, **kwargs): for expected, actual in zip_longest(test_case.results, actual_results): assert actual == expected + + +def test_out_of_order_rows(): + row_merger = _RowMerger(last_seen_row=b"z") + with pytest.raises(InvalidChunk): + list(row_merger.process_chunks(ReadRowsResponse(last_scanned_row_key=b"a"))) + + +def test_bare_reset(): + first_chunk = ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk( + row_key=b"a", family_name="f", qualifier=b"q", value=b"v" + ) + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, row_key=b"a") + ), + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, family_name="f") + ), + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, qualifier=b"q") + ), + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, timestamp_micros=1000) + ), + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, labels=["a"]) + ), + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, value=b"v") + ), + ) + + +def test_missing_family(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + qualifier=b"q", + timestamp_micros=1000, + value=b"v", + commit_row=True, + ) + ) + + +def test_mid_cell_row_key_change(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + family_name="f", + qualifier=b"q", + timestamp_micros=1000, + value_size=2, + value=b"v", + ), + ReadRowsResponse.CellChunk(row_key=b"b", value=b"v", commit_row=True), + ) + + +def test_mid_cell_family_change(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + family_name="f", + qualifier=b"q", + timestamp_micros=1000, + value_size=2, + value=b"v", + ), + ReadRowsResponse.CellChunk(family_name="f2", value=b"v", commit_row=True), + ) + + +def test_mid_cell_qualifier_change(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + family_name="f", + qualifier=b"q", + timestamp_micros=1000, + value_size=2, + value=b"v", + ), + ReadRowsResponse.CellChunk(qualifier=b"q2", value=b"v", commit_row=True), + ) + + +def test_mid_cell_timestamp_change(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + family_name="f", + qualifier=b"q", + timestamp_micros=1000, + value_size=2, + value=b"v", + ), + ReadRowsResponse.CellChunk( + timestamp_micros=2000, value=b"v", commit_row=True + ), + ) + + +def test_mid_cell_labels_change(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + family_name="f", + qualifier=b"q", + timestamp_micros=1000, + value_size=2, + value=b"v", + ), + ReadRowsResponse.CellChunk(labels=["b"], value=b"v", commit_row=True), + ) + + +def _process_chunks(*chunks): + req = ReadRowsResponse.pb(ReadRowsResponse(chunks=chunks)) + return list(_RowMerger().process_chunks(req))