Skip to content

Commit

Permalink
Support arrays and timestamps (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavius authored Aug 11, 2020
1 parent 91346c2 commit 2718a9f
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 18 deletions.
79 changes: 70 additions & 9 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os.path
import unittest
import time
import array
import datetime

import future.utils

Expand Down Expand Up @@ -117,13 +119,13 @@ def test_delete_stream_with_cg(self):
self._client.put_records(container=self._container,
path=self._path,
records=[
{'shard_id': shard_id, 'data': f'data for shard {shard_id}'}
{'shard_id': shard_id, 'data': 'data for shard {}'.format(shard_id)}
])

# write several "consumer group state" files
for cg_id in range(3):
self._client.put_object(container=self._container,
path=os.path.join(self._path, f'cg{cg_id}-state.json'))
path=os.path.join(self._path, 'cg{}-state.json'.format(cg_id)))

# check that the stream doesn't exist
self.assertTrue(self._stream_exists())
Expand Down Expand Up @@ -384,7 +386,46 @@ def setUp(self):
self._path = 'some_dir/v3io-py-test-emd'
self._delete_dir(self._path)

def test_emd_array(self):
item_key = 'item_with_arrays'
item = {
'list_with_ints': [1, 2, 3],
'list_with_floats': [10.25, 20.25, 30.25],
}

item_path = v3io.common.helpers.url_join(self._path, item_key)

self._client.put_item(container=self._container,
path=item_path,
attributes=item)

for attribute_name in item.keys():
self._client.update_item(container=self._container,
path=item_path,
expression=f'{attribute_name}[1]={attribute_name}[1]*2')

# get the item
response = self._client.get_item(container=self._container, path=item_path)

for attribute_name in item.keys():
self.assertEqual(response.output.item[attribute_name][1], item[attribute_name][1] * 2)

def test_emd_values(self):

def _get_int_array():
int_array = array.array('l')
for value in range(10):
int_array.append(value)

return int_array

def _get_float_array():
float_array = array.array('d')
for value in range(10):
float_array.append(value)

return float_array

item_key = 'bob'
item = {
item_key: {
Expand All @@ -396,7 +437,12 @@ def test_emd_values(self):
'unicode': u'\xd7\xa9\xd7\x9c\xd7\x95\xd7\x9d',
'male': True,
'happy': False,
'blob': b'+AFymWFzAL/LUOiU2huiADbugMH0AARATEO1'
'blob': b'+AFymWFzAL/LUOiU2huiADbugMH0AARATEO1',
'list_with_ints': [1, 2, 3],
'list_with_floats': [10.5, 20.5, 30.5],
'array_with_ints': _get_int_array(),
'array_with_floats': _get_float_array(),
'now': datetime.datetime.utcnow()
}
}

Expand All @@ -410,14 +456,10 @@ def test_emd_values(self):
self.assertEqual(len(item[item_key].keys()), len(response.output.item.keys()))

for key, value in response.output.item.items():
if item[item_key][key] != value:
self.fail('Values dont match')
self._compare_item_values(item[item_key][key], value)

for key, value in item[item_key].items():

# can't guarantee strings as they might be converted to unicode
if type(value) is not str:
self.assertEqual(type(value), type(response.output.item[key]))
self._compare_item_types(item[item_key][key], response.output.item[key])

def test_emd(self):
items = {
Expand Down Expand Up @@ -602,6 +644,25 @@ def _verify_items(self, path, items):
# TODO: verify contents
self.assertEqual(len(items), len(received_items))

def _compare_item_values(self, v1, v2):
if isinstance(v1, array.array):

# convert to list
v1 = list(v1)

if v1 != v2:
self.fail('Values dont match')

def _compare_item_types(self, v1, v2):
if isinstance(v1, array.array):

# convert to list
v1 = list(v1)

# can't guarantee strings as they might be converted to unicode
if type(v1) is not str:
self.assertEqual(type(v1), type(v2))


class TestRaiseForStatus(Test):

Expand Down
7 changes: 6 additions & 1 deletion v3io/dataplane/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,16 @@ def put_item(self,
condition=None):
"""Creates an item with the provided attributes. If an item with the same name (primary key) already exists in
the specified table, the existing item is completely overwritten (replaced with a new item). If the item or
table do not exist, the operation creates them.
table do not exist, the operation creates them. The max number of values per item is 1000.
See:
https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/nosql-web-api/putitem/
Notes:
1. To provide arrays, pass either a list of integers ([1, 2, 3]), a list of floats ([1.0, 2.0, 3.0]) an
array.array with a typecode of either 'l' (integer) or 'd' (float). The response will always either be
a list of integers or a list of floats (never an array.array)
Parameters
----------
container (Required) : str
Expand Down
51 changes: 51 additions & 0 deletions v3io/dataplane/item_array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import struct
import base64

# constants
ITEM_HEADER_MAGIC = struct.pack("I", 11223344)
ITEM_HEADER_MAGIC_AND_VERSION = ITEM_HEADER_MAGIC + struct.pack("I", 1)

OPERAND_TYPE_LONG = 259
OPERAND_TYPE_DOUBLE = 261


def encode_list(list_value):
typecode = 'l'
if len(list_value) and isinstance(list_value[0], float):
typecode = 'd'

return encode_array(list_value, typecode)


def encode_array(array_value, typecode):
num_items = len(array_value)
operand_type = OPERAND_TYPE_LONG if typecode == 'l' else OPERAND_TYPE_DOUBLE

encoded_array = ITEM_HEADER_MAGIC_AND_VERSION + struct.pack('II' + typecode * num_items,
num_items * 8,
operand_type,
*array_value)

return base64.b64encode(encoded_array)


def decode(encoded_array):
static_header_len = len(ITEM_HEADER_MAGIC_AND_VERSION)

# do a quick peek before we decode
if len(encoded_array) <= static_header_len or not encoded_array.startswith(ITEM_HEADER_MAGIC_AND_VERSION):
raise ValueError('Not an encoded array')

# get header (which contains number of items and type
header = encoded_array[static_header_len:static_header_len+8]
values = encoded_array[static_header_len+len(header):]

# unpack the header to get the size and operand
unpacked_header = struct.unpack('II', header)

# get the typecode and number of items
typecode = 'l' if unpacked_header[1] == OPERAND_TYPE_LONG else 'd'
num_items = int(unpacked_header[0] / 8)

# decode the values
return list(struct.unpack(typecode*num_items, values))
38 changes: 38 additions & 0 deletions v3io/dataplane/item_timestamp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import sys
import datetime

# used only n py2
BASE_DATETIME = datetime.datetime(1970, 1, 1)


def _get_timestamp_from_datetime_py3(dt):
return dt.replace(tzinfo=datetime.timezone.utc).timestamp()


def _get_timestamp_from_datetime_py2(dt):
return (dt - BASE_DATETIME).total_seconds()


# _get_timestamp_from_datetime is python version specific. resolve this once
if sys.version_info[0] >= 3:
_get_timestamp_from_datetime = _get_timestamp_from_datetime_py3
else:
_get_timestamp_from_datetime = _get_timestamp_from_datetime_py2


def encode(dt):
timestamp = _get_timestamp_from_datetime(dt)

# get integer and float part of nanoseconds
seconds, nanoseconds_float = divmod(timestamp, 1)
nanoseconds = int(nanoseconds_float * 10e9)

return '{}:{}'.format(int(seconds), nanoseconds)


def decode(encoded_dt):
seconds_str, nanoseconds_str = encoded_dt.split(':')

timestamp = int(seconds_str) + (int(nanoseconds_str) / 10e9)

return datetime.datetime.utcfromtimestamp(timestamp)
16 changes: 15 additions & 1 deletion v3io/dataplane/output.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import base64
import string
import struct

import future.utils


import v3io.dataplane.item_array
import v3io.dataplane.item_timestamp


class Output(object):

def _decode_typed_attributes(self, typed_attributes):
Expand All @@ -18,11 +22,21 @@ def _decode_typed_attributes(self, typed_attributes):
decoded_attribute = float(attribute_value)
elif attribute_type == 'B':
decoded_attribute = base64.b64decode(attribute_value)

# try to decode as an array
try:
decoded_attribute = v3io.dataplane.item_array.decode(decoded_attribute)
except:
pass

elif attribute_type == 'S':
if type(attribute_value) in [float, int]:
decoded_attribute = str(attribute_value)
else:
decoded_attribute = attribute_value

elif attribute_type == 'TS':
decoded_attribute = v3io.dataplane.item_timestamp.decode(attribute_value)
else:
decoded_attribute = attribute_value

Expand Down
14 changes: 13 additions & 1 deletion v3io/dataplane/request.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import base64
import future.utils
import datetime
import array
import datetime

try:
from urllib.parse import urlencode
Expand All @@ -10,7 +12,8 @@
import ujson

import v3io.common.helpers

import v3io.dataplane.item_array
import v3io.dataplane.item_timestamp

#
# Request
Expand Down Expand Up @@ -395,6 +398,15 @@ def _dict_to_typed_attributes(d):
elif isinstance(value, bool):
type_key = 'BOOL'
type_value = value
elif isinstance(value, list):
type_key = 'B'
type_value = v3io.dataplane.item_array.encode_list(value)
elif isinstance(value, array.array):
type_key = 'B'
type_value = v3io.dataplane.item_array.encode_array(value, value.typecode)
elif isinstance(value, datetime.datetime):
type_key = 'TS'
type_value = v3io.dataplane.item_timestamp.encode(value)
else:
raise AttributeError('Attribute {0} has unsupported type {1}'.format(key, attribute_type))

Expand Down
14 changes: 8 additions & 6 deletions v3io/dataplane/transport/httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, ve

# python 2 and 3 have different exceptions
if sys.version_info[0] >= 3:
self._wait_response_exceptions = (http.client.RemoteDisconnected, ConnectionResetError, ConnectionRefusedError)
self._send_request_exceptions = (BrokenPipeError, http.client.CannotSendRequest, http.client.RemoteDisconnected)
self._wait_response_exceptions = (
http.client.RemoteDisconnected, ConnectionResetError, ConnectionRefusedError)
self._send_request_exceptions = (
BrokenPipeError, http.client.CannotSendRequest, http.client.RemoteDisconnected)
self._get_status_and_headers = self._get_status_and_headers_py3
else:
self._wait_response_exceptions = (http.client.BadStatusLine, socket.error)
Expand Down Expand Up @@ -95,9 +97,9 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):

raise e

self._logger.info_with('Remote disconnected while waiting for response',
retries_left=num_retries,
connection_idx=connection_idx)
self._logger.debug_with('Remote disconnected while waiting for response',
retries_left=num_retries,
connection_idx=connection_idx)

num_retries -= 1

Expand Down Expand Up @@ -125,7 +127,7 @@ def _send_request_on_connection(self, request, connection_idx):
try:
connection.request(request.method, request.path, request.body, request.headers)
except self._send_request_exceptions as e:
self._logger.info_with('Disconnected while attempting to send. Recreating connection', e=type(e))
self._logger.debug_with('Disconnected while attempting to send. Recreating connection', e=type(e))

connection = self._recreate_connection_at_index(connection_idx)

Expand Down

0 comments on commit 2718a9f

Please sign in to comment.