Skip to content

Commit

Permalink
Merge pull request #20 from clearcare/feature/scan-query-all-in-table
Browse files Browse the repository at this point in the history
Move query/scan functionality into table.py
  • Loading branch information
pcraciunoiu committed May 12, 2016
2 parents 5fb04a7 + c660fff commit 01a3f01
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 43 deletions.
50 changes: 11 additions & 39 deletions cc_dynamodb3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from . import exceptions
from .config import get_config
from .log import log_data
from .table import get_table, query_table
from .table import get_table, query_table, query_all_in_table, scan_all_in_table


class DynamoDBModel(Model):
Expand Down Expand Up @@ -110,18 +110,8 @@ def _initial_data_to_dynamodb(cls, data):

@classmethod
def all(cls):
response = cls.table().scan()
# DynamoDB scan only returns up to 1MB of data, so we need to keep scanning.
while True:
metadata = response.get('ResponseMetadata', {})
for row in response['Items']:
yield cls.from_row(row, metadata)
if response.get('LastEvaluatedKey'):
response = cls.table().scan(
ExclusiveStartKey=response['LastEvaluatedKey'],
)
else:
break
for row, metadata in scan_all_in_table(cls.table()):
yield cls.from_row(row, metadata)

@classmethod
def paginated_query(cls, query_index=None, descending=False, limit=None, exclusive_start_key=None, filter_expression=None, **query_keys):
Expand Down Expand Up @@ -173,32 +163,14 @@ def paginated_query(cls, query_index=None, descending=False, limit=None, exclusi
@classmethod
def query(cls, query_index=None, descending=False, limit=None, filter_expression=None, **query_keys):
query_index = query_index or getattr(cls, 'QUERY_INDEX', None)
response = query_table(cls.TABLE_NAME,
query_index=query_index,
descending=descending,
limit=limit,
filter_expression=filter_expression,
**query_keys)
total_found = 0
# DynamoDB scan only returns up to 1MB of data, so we need to keep querying.
while True:
metadata = response.get('ResponseMetadata', {})
for row in response['Items']:
yield cls.from_row(row, metadata)
total_found += 1
if limit and total_found == limit:
break
if limit and total_found == limit:
break
if response.get('LastEvaluatedKey'):
response = query_table(cls.TABLE_NAME,
query_index=query_index,
descending=descending,
limit=limit,
exclusive_start_key=response['LastEvaluatedKey'],
**query_keys)
else:
break
for row, metadata in query_all_in_table(
cls.table(),
query_index=query_index,
descending=descending,
limit=limit,
filter_expression=filter_expression,
**query_keys):
yield cls.from_row(row, metadata)

@classmethod
def query_count(cls, query_index=None, descending=False, limit=None, **query_keys):
Expand Down
72 changes: 69 additions & 3 deletions cc_dynamodb3/table.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from functools import partial
import operator

from boto3.dynamodb.conditions import Key, Attr
Expand Down Expand Up @@ -195,12 +196,15 @@ def get_table(table_name, connection=None):
)


def query_table(table_name, query_index=None, descending=False, limit=None, count=False,
def _maybe_table_from_name(table_name_or_class):
return get_table(table_name_or_class) if isinstance(table_name_or_class, basestring) else table_name_or_class

def query_table(table_name_or_class, query_index=None, descending=False, limit=None, count=False,
exclusive_start_key=None, filter_expression=None, **query_keys):
"""
Friendly version to query a table using boto3's interface
:param table_name: (string) un-prefixed table name
:param table_name_or_class: (string) un-prefixed table name
:param query_index: (string, optional) optionally specify a GSI (Global) or LSI (Local Secondary Index)
:param descending: (boolean, optional) sort in descending order (default: False)
:param limit: (integer, optional) limit the number of results directly in the query to dynamodb
Expand Down Expand Up @@ -265,7 +269,69 @@ def query_table(table_name, query_index=None, descending=False, limit=None, coun
if exclusive_start_key:
query_kwargs['ExclusiveStartKey'] = exclusive_start_key

return get_table(table_name).query(**query_kwargs)
return _maybe_table_from_name(table_name_or_class).query(**query_kwargs)


def scan_table(table_name_or_class, exclusive_start_key=None, **scan_kwargs):
if exclusive_start_key:
scan_kwargs['ExclusiveStartKey'] = exclusive_start_key
return _maybe_table_from_name(table_name_or_class).scan(**scan_kwargs)


def _retrieve_all_matching(query_or_scan_func, *args, **kwargs):
"""Used by scan/query below."""
limit = kwargs.pop('limit', None)
query_or_scan_kwargs = kwargs.copy()
response = query_or_scan_func(*args, **query_or_scan_kwargs)
total_found = 0

# DynamoDB only returns up to 1MB of data per trip, so we need to keep querying or scanning.
while True:
metadata = response.get('ResponseMetadata', {})
for row in response['Items']:
yield row, metadata
total_found += 1
if limit and total_found == limit:
break
if limit and total_found == limit:
break
if response.get('LastEvaluatedKey'):
query_or_scan_kwargs['exclusive_start_key'] = response['LastEvaluatedKey']
response = query_or_scan_func(*args, **query_or_scan_kwargs)
else:
break


def scan_all_in_table(table_name_or_class, *args, **kwargs):
"""
Scan all records in a table. May perform multiple calls to DynamoDB.
DynamoDB only returns up to 1MB of data per scan, so we need to keep scanning,
using LastEvaluatedKey.
:param table_name_or_class: 'some_table' or get_table('some_table')
:param args: see args accepted by boto3 dynamodb scan
:param kwargs: see kwargs accepted by boto3 dynamodb scan
:return: list of records as tuples (row, metadata)
"""
scan_partial = partial(scan_table, table_name_or_class)
return _retrieve_all_matching(scan_partial, *args, **kwargs)


def query_all_in_table(table_name_or_class, *args, **kwargs):
"""
Query all records in a table. May perform multiple calls to DynamoDB.
DynamoDB only returns up to 1MB of data per query, so we need to keep querying,
using LastEvaluatedKey.
:param table_name_or_class: 'some_table' or get_table('some_table')
:param args: see args accepted by query_table
:param kwargs: see kwargs accepted by query_table
:return: list of records as tuples (row, metadata)
"""
query_partial = partial(query_table, table_name_or_class)
return _retrieve_all_matching(query_partial, *args, **kwargs)


def list_table_names():
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
'schematics==1.1.1',
],
tests_require=['pytest', 'mock', 'factory_boy', 'moto'],
version = '0.6.12',
version = '0.6.13',
description = 'A dynamodb common configuration abstraction',
author='Paul Craciunoiu',
author_email='pcraciunoiu@clearcareonline.com',
Expand Down
36 changes: 36 additions & 0 deletions tests/test_scan_query_all.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from conftest import DYNAMODB_FIXTURES
from cc_dynamodb3.mocks import mock_table_with_data
from cc_dynamodb3.table import scan_all_in_table, query_all_in_table


def test_scan_all_works_on_case_with_little_data():
data = DYNAMODB_FIXTURES['nps_survey']
data_by_profile_id = {i['profile_id']: i for i in data}
table = mock_table_with_data('nps_survey', data)

results = list(scan_all_in_table(table))
assert len(results) == 2

for result, metadata in results:
item = data_by_profile_id[result.get('profile_id')]
assert item['agency_id'] == result.get('agency_id')
assert item['recommend_score'] == result.get('recommend_score')
assert item.get('favorite') == result.get('favorite')


def test_query_all_works_on_case_with_little_data():
data = DYNAMODB_FIXTURES['nps_survey']
data_by_profile_id = {i['profile_id']: i for i in data}
table = mock_table_with_data('nps_survey', data)

results = list(query_all_in_table(table, agency_id=1669))
assert len(results) == 2

for result, metadata in results:
item = data_by_profile_id[result.get('profile_id')]
assert item['agency_id'] == result.get('agency_id')
assert item['recommend_score'] == result.get('recommend_score')
assert item.get('favorite') == result.get('favorite')

results = list(query_all_in_table(table, agency_id=1000))
assert len(results) == 0

0 comments on commit 01a3f01

Please sign in to comment.