Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding HappyBase Table.scan(). #1543

Merged
merged 1 commit into from
Feb 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 57 additions & 7 deletions gcloud/bigtable/happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


import struct
import warnings

import six

Expand All @@ -40,6 +41,7 @@
from gcloud.bigtable.table import Table as _LowLevelTable


_WARN = warnings.warn
_UNPACK_I64 = struct.Struct('>q').unpack
_SIMPLE_GC_RULES = (MaxAgeGCRule, MaxVersionsGCRule)

Expand Down Expand Up @@ -367,15 +369,63 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
:param kwargs: Remaining keyword arguments. Provided for HappyBase
compatibility.

:raises: :class:`ValueError <exceptions.ValueError>` if ``batch_size``
or ``scan_batching`` are used, or if ``limit`` is set but
non-positive, or if row prefix is used with row start/stop,
:raises: If ``limit`` is set but non-positive, or if row prefix is
used with row start/stop,
:class:`TypeError <exceptions.TypeError>` if a string
``filter`` is used,
:class:`NotImplementedError <exceptions.NotImplementedError>`
always (until the method is implemented).
``filter`` is used.
"""
raise NotImplementedError
legacy_args = []
for kw_name in ('batch_size', 'scan_batching', 'sorted_columns'):
if kw_name in kwargs:
legacy_args.append(kw_name)
kwargs.pop(kw_name)
if legacy_args:
legacy_args = ', '.join(legacy_args)
message = ('The HappyBase legacy arguments %s were used. These '
'arguments are unused by gcloud.' % (legacy_args,))
_WARN(message)
if kwargs:
raise TypeError('Received unexpected arguments', kwargs.keys())

if limit is not None and limit < 1:
raise ValueError('limit must be positive')
if row_prefix is not None:
if row_start is not None or row_stop is not None:
raise ValueError('row_prefix cannot be combined with '
'row_start or row_stop')
row_start = row_prefix
row_stop = _string_successor(row_prefix)

filters = []
if isinstance(filter, six.string_types):
raise TypeError('Specifying filters as a string is not supported '
'by Cloud Bigtable. Use a '
'gcloud.bigtable.row.RowFilter instead.')
elif filter is not None:
filters.append(filter)

if columns is not None:
filters.append(_columns_filter_helper(columns))
# versions == 1 since we only want the latest.
filter_ = _filter_chain_helper(versions=1, timestamp=timestamp,
filters=filters)

partial_rows_data = self._low_level_table.read_rows(
start_key=row_start, end_key=row_stop,
limit=limit, filter_=filter_)

# Mutable copy of data.
rows_dict = partial_rows_data.rows
while True:
try:
partial_rows_data.consume_next()
row_key, curr_row_data = rows_dict.popitem()
# NOTE: We expect len(rows_dict) == 0, but don't check it.
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
yield (row_key, curr_row_dict)
except StopIteration:
break

def put(self, row, data, timestamp=None, wal=_WAL_SENTINEL):
"""Insert data into a row in this table.
Expand Down
201 changes: 198 additions & 3 deletions gcloud/bigtable/happybase/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,13 +507,203 @@ def mock_cells_to_pairs(*args, **kwargs):
self.assertEqual(mock_cells,
[((fake_cells,), to_pairs_kwargs)])

def test_scan(self):
def test_scan_with_batch_size(self):
from gcloud._testing import _Monkey
from gcloud.bigtable.happybase import table as MUT

warned = []

def mock_warn(msg):
warned.append(msg)

name = 'table-name'
connection = None
table = self._makeOne(name, connection)
# Use unknown to force a TypeError, so we don't need to
# stub out the rest of the method.
with self.assertRaises(TypeError):
with _Monkey(MUT, _WARN=mock_warn):
list(table.scan(batch_size=object(), unknown=None))

with self.assertRaises(NotImplementedError):
table.scan()
self.assertEqual(len(warned), 1)
self.assertIn('batch_size', warned[0])

def test_scan_with_scan_batching(self):
from gcloud._testing import _Monkey
from gcloud.bigtable.happybase import table as MUT

warned = []

def mock_warn(msg):
warned.append(msg)

name = 'table-name'
connection = None
table = self._makeOne(name, connection)
# Use unknown to force a TypeError, so we don't need to
# stub out the rest of the method.
with self.assertRaises(TypeError):
with _Monkey(MUT, _WARN=mock_warn):
list(table.scan(scan_batching=object(), unknown=None))

self.assertEqual(len(warned), 1)
self.assertIn('scan_batching', warned[0])

def test_scan_with_sorted_columns(self):
from gcloud._testing import _Monkey
from gcloud.bigtable.happybase import table as MUT

warned = []

def mock_warn(msg):
warned.append(msg)

name = 'table-name'
connection = None
table = self._makeOne(name, connection)
# Use unknown to force a TypeError, so we don't need to
# stub out the rest of the method.
with self.assertRaises(TypeError):
with _Monkey(MUT, _WARN=mock_warn):
list(table.scan(sorted_columns=object(), unknown=None))

self.assertEqual(len(warned), 1)
self.assertIn('sorted_columns', warned[0])

def test_scan_with_invalid_limit(self):
name = 'table-name'
connection = None
table = self._makeOne(name, connection)
with self.assertRaises(ValueError):
list(table.scan(limit=-10))

def test_scan_with_row_prefix_and_row_start(self):
name = 'table-name'
connection = None
table = self._makeOne(name, connection)
with self.assertRaises(ValueError):
list(table.scan(row_prefix='a', row_stop='abc'))

def test_scan_with_string_filter(self):
name = 'table-name'
connection = None
table = self._makeOne(name, connection)
with self.assertRaises(TypeError):
list(table.scan(filter='some-string'))

def _scan_test_helper(self, row_limits=(None, None), row_prefix=None,
columns=None, filter_=None, timestamp=None,
include_timestamp=False, limit=None, rr_result=None,
expected_result=None):
import types
from gcloud._testing import _Monkey
from gcloud.bigtable.happybase import table as MUT

name = 'table-name'
row_start, row_stop = row_limits
connection = None
table = self._makeOne(name, connection)
table._low_level_table = _MockLowLevelTable()
rr_result = rr_result or _MockPartialRowsData()
table._low_level_table.read_rows_result = rr_result
self.assertEqual(rr_result.consume_next_calls, 0)

# Set-up mocks.
fake_col_filter = object()
mock_columns = []

def mock_columns_filter_helper(*args):
mock_columns.append(args)
return fake_col_filter

fake_filter = object()
mock_filters = []

def mock_filter_chain_helper(**kwargs):
mock_filters.append(kwargs)
return fake_filter

with _Monkey(MUT, _filter_chain_helper=mock_filter_chain_helper,
_columns_filter_helper=mock_columns_filter_helper):
result = table.scan(row_start=row_start, row_stop=row_stop,
row_prefix=row_prefix, columns=columns,
filter=filter_, timestamp=timestamp,
include_timestamp=include_timestamp,
limit=limit)
self.assertTrue(isinstance(result, types.GeneratorType))
# Need to consume the result while the monkey patch is applied.
# read_rows_result == Empty PartialRowsData --> No results.
expected_result = expected_result or []
self.assertEqual(list(result), expected_result)

read_rows_args = ()
if row_prefix:
row_start = row_prefix
row_stop = MUT._string_successor(row_prefix)
read_rows_kwargs = {
'end_key': row_stop,
'filter_': fake_filter,
'limit': limit,
'start_key': row_start,
}
self.assertEqual(table._low_level_table.read_rows_calls, [
(read_rows_args, read_rows_kwargs),
])
self.assertEqual(rr_result.consume_next_calls,
rr_result.iterations + 1)

if columns is not None:
self.assertEqual(mock_columns, [(columns,)])
else:
self.assertEqual(mock_columns, [])

filters = []
if filter_ is not None:
filters.append(filter_)
if columns:
filters.append(fake_col_filter)
expected_kwargs = {
'filters': filters,
'versions': 1,
'timestamp': timestamp,
}
self.assertEqual(mock_filters, [expected_kwargs])

def test_scan_with_columns(self):
columns = object()
self._scan_test_helper(columns=columns)

def test_scan_with_row_start_and_stop(self):
row_start = 'bar'
row_stop = 'foo'
row_limits = (row_start, row_stop)
self._scan_test_helper(row_limits=row_limits)

def test_scan_with_row_prefix(self):
row_prefix = 'row-prefi'
self._scan_test_helper(row_prefix=row_prefix)

def test_scan_with_filter(self):
mock_filter = object()
self._scan_test_helper(filter_=mock_filter)

def test_scan_with_no_results(self):
limit = 1337
timestamp = object()
self._scan_test_helper(timestamp=timestamp, limit=limit)

def test_scan_with_results(self):
from gcloud.bigtable.row_data import PartialRowData

row_key1 = 'row-key1'
row1 = PartialRowData(row_key1)
rr_result = _MockPartialRowsData(rows={row_key1: row1}, iterations=1)

include_timestamp = object()
expected_result = [(row_key1, {})]
self._scan_test_helper(include_timestamp=include_timestamp,
rr_result=rr_result,
expected_result=expected_result)

def test_put(self):
from gcloud._testing import _Monkey
Expand Down Expand Up @@ -1292,3 +1482,8 @@ def __init__(self, rows=None, iterations=0):

def consume_all(self):
self.consume_all_calls += 1

def consume_next(self):
self.consume_next_calls += 1
if self.consume_next_calls > self.iterations:
raise StopIteration