From 2718a9fc1d890e26c152ac1deb6b216cc51b1bac Mon Sep 17 00:00:00 2001 From: Eran Duchan Date: Tue, 11 Aug 2020 10:33:04 +0300 Subject: [PATCH] Support arrays and timestamps (#46) --- tests/test_client.py | 79 +++++++++++++++++++++++--- v3io/dataplane/client.py | 7 ++- v3io/dataplane/item_array.py | 51 +++++++++++++++++ v3io/dataplane/item_timestamp.py | 38 +++++++++++++ v3io/dataplane/output.py | 16 +++++- v3io/dataplane/request.py | 14 ++++- v3io/dataplane/transport/httpclient.py | 14 +++-- 7 files changed, 201 insertions(+), 18 deletions(-) create mode 100644 v3io/dataplane/item_array.py create mode 100644 v3io/dataplane/item_timestamp.py diff --git a/tests/test_client.py b/tests/test_client.py index 44c76c5..f19bfac 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,6 +1,8 @@ import os.path import unittest import time +import array +import datetime import future.utils @@ -117,13 +119,13 @@ def test_delete_stream_with_cg(self): self._client.put_records(container=self._container, path=self._path, records=[ - {'shard_id': shard_id, 'data': f'data for shard {shard_id}'} + {'shard_id': shard_id, 'data': 'data for shard {}'.format(shard_id)} ]) # write several "consumer group state" files for cg_id in range(3): self._client.put_object(container=self._container, - path=os.path.join(self._path, f'cg{cg_id}-state.json')) + path=os.path.join(self._path, 'cg{}-state.json'.format(cg_id))) # check that the stream doesn't exist self.assertTrue(self._stream_exists()) @@ -384,7 +386,46 @@ def setUp(self): self._path = 'some_dir/v3io-py-test-emd' self._delete_dir(self._path) + def test_emd_array(self): + item_key = 'item_with_arrays' + item = { + 'list_with_ints': [1, 2, 3], + 'list_with_floats': [10.25, 20.25, 30.25], + } + + item_path = v3io.common.helpers.url_join(self._path, item_key) + + self._client.put_item(container=self._container, + path=item_path, + attributes=item) + + for attribute_name in item.keys(): + self._client.update_item(container=self._container, + path=item_path, + expression=f'{attribute_name}[1]={attribute_name}[1]*2') + + # get the item + response = self._client.get_item(container=self._container, path=item_path) + + for attribute_name in item.keys(): + self.assertEqual(response.output.item[attribute_name][1], item[attribute_name][1] * 2) + def test_emd_values(self): + + def _get_int_array(): + int_array = array.array('l') + for value in range(10): + int_array.append(value) + + return int_array + + def _get_float_array(): + float_array = array.array('d') + for value in range(10): + float_array.append(value) + + return float_array + item_key = 'bob' item = { item_key: { @@ -396,7 +437,12 @@ def test_emd_values(self): 'unicode': u'\xd7\xa9\xd7\x9c\xd7\x95\xd7\x9d', 'male': True, 'happy': False, - 'blob': b'+AFymWFzAL/LUOiU2huiADbugMH0AARATEO1' + 'blob': b'+AFymWFzAL/LUOiU2huiADbugMH0AARATEO1', + 'list_with_ints': [1, 2, 3], + 'list_with_floats': [10.5, 20.5, 30.5], + 'array_with_ints': _get_int_array(), + 'array_with_floats': _get_float_array(), + 'now': datetime.datetime.utcnow() } } @@ -410,14 +456,10 @@ def test_emd_values(self): self.assertEqual(len(item[item_key].keys()), len(response.output.item.keys())) for key, value in response.output.item.items(): - if item[item_key][key] != value: - self.fail('Values dont match') + self._compare_item_values(item[item_key][key], value) for key, value in item[item_key].items(): - - # can't guarantee strings as they might be converted to unicode - if type(value) is not str: - self.assertEqual(type(value), type(response.output.item[key])) + self._compare_item_types(item[item_key][key], response.output.item[key]) def test_emd(self): items = { @@ -602,6 +644,25 @@ def _verify_items(self, path, items): # TODO: verify contents self.assertEqual(len(items), len(received_items)) + def _compare_item_values(self, v1, v2): + if isinstance(v1, array.array): + + # convert to list + v1 = list(v1) + + if v1 != v2: + self.fail('Values dont match') + + def _compare_item_types(self, v1, v2): + if isinstance(v1, array.array): + + # convert to list + v1 = list(v1) + + # can't guarantee strings as they might be converted to unicode + if type(v1) is not str: + self.assertEqual(type(v1), type(v2)) + class TestRaiseForStatus(Test): diff --git a/v3io/dataplane/client.py b/v3io/dataplane/client.py index 4e1a436..ae763d3 100644 --- a/v3io/dataplane/client.py +++ b/v3io/dataplane/client.py @@ -294,11 +294,16 @@ def put_item(self, condition=None): """Creates an item with the provided attributes. If an item with the same name (primary key) already exists in the specified table, the existing item is completely overwritten (replaced with a new item). If the item or - table do not exist, the operation creates them. + table do not exist, the operation creates them. The max number of values per item is 1000. See: https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/nosql-web-api/putitem/ + Notes: + 1. To provide arrays, pass either a list of integers ([1, 2, 3]), a list of floats ([1.0, 2.0, 3.0]) an + array.array with a typecode of either 'l' (integer) or 'd' (float). The response will always either be + a list of integers or a list of floats (never an array.array) + Parameters ---------- container (Required) : str diff --git a/v3io/dataplane/item_array.py b/v3io/dataplane/item_array.py new file mode 100644 index 0000000..74d569b --- /dev/null +++ b/v3io/dataplane/item_array.py @@ -0,0 +1,51 @@ +import struct +import base64 + +# constants +ITEM_HEADER_MAGIC = struct.pack("I", 11223344) +ITEM_HEADER_MAGIC_AND_VERSION = ITEM_HEADER_MAGIC + struct.pack("I", 1) + +OPERAND_TYPE_LONG = 259 +OPERAND_TYPE_DOUBLE = 261 + + +def encode_list(list_value): + typecode = 'l' + if len(list_value) and isinstance(list_value[0], float): + typecode = 'd' + + return encode_array(list_value, typecode) + + +def encode_array(array_value, typecode): + num_items = len(array_value) + operand_type = OPERAND_TYPE_LONG if typecode == 'l' else OPERAND_TYPE_DOUBLE + + encoded_array = ITEM_HEADER_MAGIC_AND_VERSION + struct.pack('II' + typecode * num_items, + num_items * 8, + operand_type, + *array_value) + + return base64.b64encode(encoded_array) + + +def decode(encoded_array): + static_header_len = len(ITEM_HEADER_MAGIC_AND_VERSION) + + # do a quick peek before we decode + if len(encoded_array) <= static_header_len or not encoded_array.startswith(ITEM_HEADER_MAGIC_AND_VERSION): + raise ValueError('Not an encoded array') + + # get header (which contains number of items and type + header = encoded_array[static_header_len:static_header_len+8] + values = encoded_array[static_header_len+len(header):] + + # unpack the header to get the size and operand + unpacked_header = struct.unpack('II', header) + + # get the typecode and number of items + typecode = 'l' if unpacked_header[1] == OPERAND_TYPE_LONG else 'd' + num_items = int(unpacked_header[0] / 8) + + # decode the values + return list(struct.unpack(typecode*num_items, values)) diff --git a/v3io/dataplane/item_timestamp.py b/v3io/dataplane/item_timestamp.py new file mode 100644 index 0000000..8ce71ac --- /dev/null +++ b/v3io/dataplane/item_timestamp.py @@ -0,0 +1,38 @@ +import sys +import datetime + +# used only n py2 +BASE_DATETIME = datetime.datetime(1970, 1, 1) + + +def _get_timestamp_from_datetime_py3(dt): + return dt.replace(tzinfo=datetime.timezone.utc).timestamp() + + +def _get_timestamp_from_datetime_py2(dt): + return (dt - BASE_DATETIME).total_seconds() + + +# _get_timestamp_from_datetime is python version specific. resolve this once +if sys.version_info[0] >= 3: + _get_timestamp_from_datetime = _get_timestamp_from_datetime_py3 +else: + _get_timestamp_from_datetime = _get_timestamp_from_datetime_py2 + + +def encode(dt): + timestamp = _get_timestamp_from_datetime(dt) + + # get integer and float part of nanoseconds + seconds, nanoseconds_float = divmod(timestamp, 1) + nanoseconds = int(nanoseconds_float * 10e9) + + return '{}:{}'.format(int(seconds), nanoseconds) + + +def decode(encoded_dt): + seconds_str, nanoseconds_str = encoded_dt.split(':') + + timestamp = int(seconds_str) + (int(nanoseconds_str) / 10e9) + + return datetime.datetime.utcfromtimestamp(timestamp) diff --git a/v3io/dataplane/output.py b/v3io/dataplane/output.py index 7caec22..0302ad9 100644 --- a/v3io/dataplane/output.py +++ b/v3io/dataplane/output.py @@ -1,9 +1,13 @@ import base64 -import string +import struct import future.utils +import v3io.dataplane.item_array +import v3io.dataplane.item_timestamp + + class Output(object): def _decode_typed_attributes(self, typed_attributes): @@ -18,11 +22,21 @@ def _decode_typed_attributes(self, typed_attributes): decoded_attribute = float(attribute_value) elif attribute_type == 'B': decoded_attribute = base64.b64decode(attribute_value) + + # try to decode as an array + try: + decoded_attribute = v3io.dataplane.item_array.decode(decoded_attribute) + except: + pass + elif attribute_type == 'S': if type(attribute_value) in [float, int]: decoded_attribute = str(attribute_value) else: decoded_attribute = attribute_value + + elif attribute_type == 'TS': + decoded_attribute = v3io.dataplane.item_timestamp.decode(attribute_value) else: decoded_attribute = attribute_value diff --git a/v3io/dataplane/request.py b/v3io/dataplane/request.py index b3eefe6..74c73ad 100644 --- a/v3io/dataplane/request.py +++ b/v3io/dataplane/request.py @@ -1,6 +1,8 @@ import base64 import future.utils import datetime +import array +import datetime try: from urllib.parse import urlencode @@ -10,7 +12,8 @@ import ujson import v3io.common.helpers - +import v3io.dataplane.item_array +import v3io.dataplane.item_timestamp # # Request @@ -395,6 +398,15 @@ def _dict_to_typed_attributes(d): elif isinstance(value, bool): type_key = 'BOOL' type_value = value + elif isinstance(value, list): + type_key = 'B' + type_value = v3io.dataplane.item_array.encode_list(value) + elif isinstance(value, array.array): + type_key = 'B' + type_value = v3io.dataplane.item_array.encode_array(value, value.typecode) + elif isinstance(value, datetime.datetime): + type_key = 'TS' + type_value = v3io.dataplane.item_timestamp.encode(value) else: raise AttributeError('Attribute {0} has unsupported type {1}'.format(key, attribute_type)) diff --git a/v3io/dataplane/transport/httpclient.py b/v3io/dataplane/transport/httpclient.py index be2fb7d..fa0fa53 100644 --- a/v3io/dataplane/transport/httpclient.py +++ b/v3io/dataplane/transport/httpclient.py @@ -26,8 +26,10 @@ def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, ve # python 2 and 3 have different exceptions if sys.version_info[0] >= 3: - self._wait_response_exceptions = (http.client.RemoteDisconnected, ConnectionResetError, ConnectionRefusedError) - self._send_request_exceptions = (BrokenPipeError, http.client.CannotSendRequest, http.client.RemoteDisconnected) + self._wait_response_exceptions = ( + http.client.RemoteDisconnected, ConnectionResetError, ConnectionRefusedError) + self._send_request_exceptions = ( + BrokenPipeError, http.client.CannotSendRequest, http.client.RemoteDisconnected) self._get_status_and_headers = self._get_status_and_headers_py3 else: self._wait_response_exceptions = (http.client.BadStatusLine, socket.error) @@ -95,9 +97,9 @@ def wait_response(self, request, raise_for_status=None, num_retries=1): raise e - self._logger.info_with('Remote disconnected while waiting for response', - retries_left=num_retries, - connection_idx=connection_idx) + self._logger.debug_with('Remote disconnected while waiting for response', + retries_left=num_retries, + connection_idx=connection_idx) num_retries -= 1 @@ -125,7 +127,7 @@ def _send_request_on_connection(self, request, connection_idx): try: connection.request(request.method, request.path, request.body, request.headers) except self._send_request_exceptions as e: - self._logger.info_with('Disconnected while attempting to send. Recreating connection', e=type(e)) + self._logger.debug_with('Disconnected while attempting to send. Recreating connection', e=type(e)) connection = self._recreate_connection_at_index(connection_idx)