From 87c7a428f4dafc7660876362f313112d247f7ec1 Mon Sep 17 00:00:00 2001 From: Eran Duchan Date: Mon, 14 Sep 2020 15:43:26 +0300 Subject: [PATCH] Experimental support for asyncio (#49) --- Pipfile | 1 + README.md | 33 ++ setup.py | 3 + tests/test_client_aio.py | 594 +++++++++++++++++++++++ v3io/aio/__init__.py | 0 v3io/aio/dataplane/__init__.py | 2 + v3io/aio/dataplane/client.py | 98 ++++ v3io/aio/dataplane/container.py | 55 +++ v3io/aio/dataplane/kv.py | 354 ++++++++++++++ v3io/aio/dataplane/kv_cursor.py | 87 ++++ v3io/aio/dataplane/object.py | 99 ++++ v3io/aio/dataplane/stream.py | 320 ++++++++++++ v3io/aio/dataplane/transport/__init__.py | 0 v3io/aio/dataplane/transport/aiohttp.py | 95 ++++ v3io/dataplane/stream.py | 8 +- 15 files changed, 1745 insertions(+), 4 deletions(-) create mode 100644 tests/test_client_aio.py create mode 100644 v3io/aio/__init__.py create mode 100644 v3io/aio/dataplane/__init__.py create mode 100644 v3io/aio/dataplane/client.py create mode 100644 v3io/aio/dataplane/container.py create mode 100644 v3io/aio/dataplane/kv.py create mode 100644 v3io/aio/dataplane/kv_cursor.py create mode 100644 v3io/aio/dataplane/object.py create mode 100644 v3io/aio/dataplane/stream.py create mode 100644 v3io/aio/dataplane/transport/__init__.py create mode 100644 v3io/aio/dataplane/transport/aiohttp.py diff --git a/Pipfile b/Pipfile index e0839b5..1b7155e 100644 --- a/Pipfile +++ b/Pipfile @@ -11,3 +11,4 @@ twine = "*" requests = ">=2.19.1" future = ">=0.18.2" ujson = "~=2.0" + diff --git a/README.md b/README.md index e253f5f..1eebc94 100644 --- a/README.md +++ b/README.md @@ -208,5 +208,38 @@ for record in response.output.records: v3io_client.stream.delete(container='users', stream_path='/my-test-stream') ``` +## Support for asyncio (experimental) + +All synchronous APIs are available as `async` interfaces through the `aio` module. The differences between the sync and async API is as follows: +1. You must initialize a different client (`v3io.aio.dataplane.Client`) from `v3io.aio.dataplane` +2. All interfaces should be called with `await` +3. `v3io.aio.dataplane.RaiseForStatus.never` should be used over `v3io.dataplane.RaiseForStatus.never` (although they are the same) +4. The batching functionality doesn't exist, as you can achieve the same through standard asyncio practices + +Note: For the time being, `aiohttp` must be provided externally - it is not a v3io-py dependency. This will be fixed in future versions. + +```python +import v3io.aio.dataplane + +v3io_client = v3io.aio.dataplane.Client(endpoint='https://v3io-webapi:8081', access_key='some_access_key') + +# put contents to some object +await v3io_client.object.put(container='users', + path='/my-object', + body='hello, there') + +# get the object +response = await v3io_client.object.get(container='users', path='/my-object') + +# print the contents. outputs: +# +# hello, there +# +print(response.body.decode('utf-8')) + +# delete the object +await v3io_client.object.delete(container='users', path='/my-object') +``` + # Controlplane client Coming soon. \ No newline at end of file diff --git a/setup.py b/setup.py index 45a004a..c297c92 100644 --- a/setup.py +++ b/setup.py @@ -65,6 +65,8 @@ def load_deps(section): 'v3io.common', 'v3io.dataplane', 'v3io.dataplane.transport', + 'v3io.aio.dataplane', + 'v3io.aio.dataplane.transport', 'v3io.logger' ], install_requires=install_requires, @@ -80,6 +82,7 @@ def load_deps(section): 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', 'Programming Language :: Python', 'Topic :: Software Development :: Libraries :: Python Modules', 'Topic :: Software Development :: Libraries', diff --git a/tests/test_client_aio.py b/tests/test_client_aio.py new file mode 100644 index 0000000..dae978b --- /dev/null +++ b/tests/test_client_aio.py @@ -0,0 +1,594 @@ +import unittest +import os +import array +import datetime +import future.utils + +import v3io.dataplane +import v3io.aio.dataplane + + +class Test(unittest.IsolatedAsyncioTestCase): + + async def asyncSetUp(self): + self._client = v3io.aio.dataplane.Client(logger_verbosity='DEBUG', + transport_verbosity='DEBUG') + + self._container = 'bigdata' + + async def asyncTearDown(self): + await self._client.close() + + async def _delete_dir(self, path): + response = await self._client.container.list(container=self._container, + path=path, + raise_for_status=v3io.aio.dataplane.RaiseForStatus.never) + + if response.status_code == 404: + return + + if response.status_code != 200: + raise RuntimeError(response.body) + + for content in response.output.contents: + await self._client.object.delete(container=self._container, path=content.key) + + for common_prefixes in response.output.common_prefixes: + await self._client.object.delete(container=self._container, + path=common_prefixes.prefix) + + +class TestContainer(Test): + + async def asyncSetUp(self): + await super(TestContainer, self).asyncSetUp() + self._path = 'v3io-py-test-container' + + # clean up + await self._delete_dir(self._path) + + async def test_get_container_contents_invalid_path(self): + response = await self._client.container.list(container=self._container, + path='/no-such-path', + raise_for_status=v3io.aio.dataplane.RaiseForStatus.never) + self.assertEqual(404, response.status_code) + self.assertIn('No such file', str(response.body)) + + async def test_get_container_contents(self): + body = 'If you cannot do great things, do small things in a great way.' + + for object_index in range(5): + await self._client.object.put(container=self._container, + path=os.path.join(self._path, 'object-{0}.txt'.format(object_index)), + body=body) + + for object_index in range(5): + await self._client.object.put(container=self._container, + path=os.path.join(self._path, 'dir-{0}/'.format(object_index))) + + response = await self._client.container.list(container=self._container, + path=self._path, + get_all_attributes=True, + directories_only=True) + self.assertEqual(0, len(response.output.contents)) + self.assertNotEqual(0, len(response.output.common_prefixes)) + + response = await self._client.container.list(container=self._container, + path=self._path, + get_all_attributes=True) + self.assertNotEqual(0, len(response.output.contents)) + self.assertNotEqual(0, len(response.output.common_prefixes)) + + # clean up + await self._delete_dir(self._path) + + +class TestStream(Test): + + async def asyncSetUp(self): + await super(TestStream, self).asyncSetUp() + + self._path = 'v3io-py-test-stream' + + # clean up + await self._client.stream.delete(container=self._container, + stream_path=self._path, + raise_for_status=[200, 204, 404]) + + async def test_delete_stream_with_cg(self): + num_shards = 8 + + # check that the stream doesn't exist + self.assertFalse(await self._stream_exists()) + + # create a stream + await self._client.stream.create(container=self._container, + stream_path=self._path, + shard_count=num_shards) + + # write data to all shards so there are files + for shard_id in range(num_shards): + await self._client.stream.put_records(container=self._container, + stream_path=self._path, + records=[ + {'shard_id': shard_id, + 'data': 'data for shard {}'.format(shard_id)} + ]) + + # write several "consumer group state" files + for cg_id in range(3): + await self._client.object.put(container=self._container, + path=os.path.join(self._path, 'cg{}-state.json'.format(cg_id))) + + # check that the stream doesn't exist + self.assertTrue(await self._stream_exists()) + + # delete the stream + await self._client.stream.delete(container=self._container, stream_path=self._path) + + # check that the stream doesn't exist + self.assertFalse(await self._stream_exists()) + + async def test_stream(self): + + # create a stream w/8 shards + await self._client.stream.create(container=self._container, + stream_path=self._path, + shard_count=8) + + records = [ + {'shard_id': 1, 'data': 'first shard record #1'}, + {'shard_id': 1, 'data': 'first shard record #2', 'client_info': bytearray(b'some info')}, + {'shard_id': 10, 'data': 'invalid shard record #1'}, + {'shard_id': 2, 'data': 'second shard record #1'}, + {'data': 'some shard record #1'}, + ] + + response = await self._client.stream.put_records(container=self._container, + stream_path=self._path, + records=records) + self.assertEqual(1, response.output.failed_record_count) + + for response_record_index, response_record in enumerate(response.output.records): + if response_record_index == 2: + self.assertIsNotNone(response_record.error_code) + else: + self.assertIsNone(response_record.error_code) + + response = await self._client.stream.seek(container=self._container, + stream_path=self._path, + shard_id=1, + seek_type='EARLIEST') + + self.assertNotEqual('', response.output.location) + + response = await self._client.stream.get_records(container=self._container, + stream_path=self._path, + shard_id=1, + location=response.output.location) + + self.assertEqual(2, len(response.output.records)) + self.assertEqual(records[0]['data'], response.output.records[0].data.decode('utf-8')) + self.assertEqual(records[1]['data'], response.output.records[1].data.decode('utf-8')) + self.assertEqual(records[1]['client_info'], response.output.records[1].client_info) + + # update the stream by adding 8 shards to it + await self._client.stream.update(container=self._container, + stream_path=self._path, + shard_count=16) + + records = [ + {'shard_id': 10, 'data': 'Now valid shard record #1'}, + ] + + response = await self._client.stream.put_records(container=self._container, + stream_path=self._path, + records=records) + + self.assertEqual(0, response.output.failed_record_count) + + await self._client.stream.delete(container=self._container, + stream_path=self._path) + + async def _stream_exists(self): + response = await self._client.stream.describe(container=self._container, + stream_path=self._path, + raise_for_status=v3io.aio.dataplane.RaiseForStatus.never) + return response.status_code == 200 + + +class TestObject(Test): + + async def asyncSetUp(self): + await super(TestObject, self).asyncSetUp() + + self._object_dir = '/v3io-py-test-object' + self._object_path = self._object_dir + '/object.txt' + + # clean up + await self._delete_dir(self._object_dir) + + async def test_object(self): + contents = 'vegans are better than everyone' + + response = await self._client.object.get(container=self._container, + path=self._object_path, + raise_for_status=v3io.aio.dataplane.RaiseForStatus.never) + + self.assertEqual(404, response.status_code) + + # put contents to some object + await self._client.object.put(container=self._container, + path=self._object_path, + body=contents) + + # get the contents + response = await self._client.object.get(container=self._container, + path=self._object_path) + + if not isinstance(response.body, str): + response.body = response.body.decode('utf-8') + + self.assertEqual(response.body, contents) + + # delete the object + await self._client.object.delete(container=self._container, + path=self._object_path) + + # get again + response = await self._client.object.get(container=self._container, + path=self._object_path, + raise_for_status=v3io.aio.dataplane.RaiseForStatus.never) + + self.assertEqual(404, response.status_code) + + async def test_append(self): + contents = [ + 'First part', + 'Second part', + 'Third part', + ] + + # put the contents into the object + for content in contents: + await self._client.object.put(container=self._container, + path=self._object_path, + body=content, + append=True) + + # get the contents + response = await self._client.object.get(container=self._container, + path=self._object_path) + + self.assertEqual(response.body.decode('utf-8'), ''.join(contents)) + + async def test_get_offset(self): + await self._client.object.put(container=self._container, + path=self._object_path, + body='1234567890') + + # get the contents without limit + response = await self._client.object.get(container=self._container, + path=self._object_path, + offset=4) + + self.assertEqual(response.body.decode('utf-8'), '567890') + + # get the contents with limit + response = await self._client.object.get(container=self._container, + path=self._object_path, + offset=4, + num_bytes=3) + + self.assertEqual(response.body.decode('utf-8'), '567') + + +# class TestSchema(Test): +# +# async def asyncSetUp(self): +# await super(TestSchema, self).asyncSetUp() +# +# self._schema_dir = '/v3io-py-test-schemaa' +# self._schema_path = os.path.join(self._schema_dir, '.%23schema') +# +# # clean up +# await self._delete_dir(self._schema_dir) +# +# async def test_create_schema(self): +# await self._client.kv.create_schema(container=self._container, +# table_path=self._schema_dir, +# key='key_field', +# fields=[ +# { +# 'name': 'key_field', +# 'type': 'string', +# 'nullable': False +# }, +# { +# 'name': 'data_field_0', +# 'type': 'long', +# 'nullable': True +# }, +# { +# 'name': 'data_field_1', +# 'type': 'double', +# 'nullable': True +# }, +# ]) +# +# # write to test the values in the UI (requires breaking afterwards) +# items = { +# 'a': {'data_field_0': 30, 'data_field_1': 100}, +# 'b': {'data_field_0': 300, 'data_field_1': 1000}, +# 'c': {'data_field_0': 3000, 'data_field_1': 10000}, +# } +# +# for item_key, item_attributes in future.utils.viewitems(items): +# await self._client.kv.put(container=self._container, +# table_path=self._schema_dir, +# key=item_key, +# attributes=item_attributes) +# +# # verify the scehma +# response = await self._client.object.get(container=self._container, +# path=self._schema_path, +# raise_for_status=v3io.aio.dataplane.RaiseForStatus.never) +# +# # find a way to assert this without assuming serialization order +# # self.assertEqual( +# # '{"hashingBucketNum":0,"key":"key_field","fields":[{"name":"key_field","type":"string","nullable":false},' +# # '{"name":"data_field_0","type":"long","nullable":true},{"name":"data_field_1","type":"double"' +# # ',"nullable":true}]}', +# # response.body.decode('utf-8')) + + +class TestKv(Test): + + async def asyncSetUp(self): + await super(TestKv, self).asyncSetUp() + + self._path = 'some_dir/v3io-py-test-emd' + await self._delete_dir(self._path) + + async def test_kv_array(self): + item_key = 'item_with_arrays' + item = { + 'list_with_ints': [1, 2, 3], + 'list_with_floats': [10.25, 20.25, 30.25], + } + + await self._client.kv.put(container=self._container, + table_path=self._path, + key=item_key, + attributes=item) + + for attribute_name in item.keys(): + await self._client.kv.update(container=self._container, + table_path=self._path, + key=item_key, + expression=f'{attribute_name}[1]={attribute_name}[1]*2') + + # get the item + response = await self._client.kv.get(container=self._container, + table_path=self._path, + key=item_key) + + for attribute_name in item.keys(): + self.assertEqual(response.output.item[attribute_name][1], item[attribute_name][1] * 2) + + async def test_kv_values(self): + + def _get_int_array(): + int_array = array.array('l') + for value in range(10): + int_array.append(value) + + return int_array + + def _get_float_array(): + float_array = array.array('d') + for value in range(10): + float_array.append(value) + + return float_array + + item_key = 'bob' + item = { + item_key: { + 'age': 42, + 'pi': 3.14, + 'feature_str': 'mustache', + 'feature_unicode': u'mustache', + 'numeric_str': '1', + 'unicode': u'\xd7\xa9\xd7\x9c\xd7\x95\xd7\x9d', + 'male': True, + 'happy': False, + 'blob': b'+AFymWFzAL/LUOiU2huiADbugMH0AARATEO1', + 'list_with_ints': [1, 2, 3], + 'list_with_floats': [10.5, 20.5, 30.5], + 'array_with_ints': _get_int_array(), + 'array_with_floats': _get_float_array(), + 'now': datetime.datetime.utcnow() + } + } + + await self._client.kv.put(container=self._container, + table_path=self._path, + key=item_key, + attributes=item[item_key]) + + response = await self._client.kv.get(container=self._container, + table_path=self._path, + key=item_key) + + self.assertEqual(len(item[item_key].keys()), len(response.output.item.keys())) + + for key, value in response.output.item.items(): + self._compare_item_values(item[item_key][key], value) + + for key, value in item[item_key].items(): + self._compare_item_types(item[item_key][key], response.output.item[key]) + + async def test_kv(self): + items = { + 'bob': {'age': 42, 'feature': 'mustache'}, + 'linda': {'age': 41, 'feature': 'singing'}, + 'louise': {'age': 9, 'feature': 'bunny ears'}, + 'tina': {'age': 14, 'feature': 'butts'}, + } + + for item_key, item_attributes in future.utils.viewitems(items): + await self._client.kv.put(container=self._container, + table_path=self._path, + key=item_key, + attributes=item_attributes) + + await self._verify_items(self._path, items) + + await self._client.kv.update(container=self._container, + table_path=self._path, + key='louise', + attributes={ + 'height': 130, + 'quip': 'i can smell fear on you' + }) + + response = await self._client.kv.get(container=self._container, + table_path=self._path, + key='louise', + attribute_names=['__size', 'age', 'quip', 'height']) + + self.assertEqual(0, response.output.item['__size']) + self.assertEqual(9, response.output.item['age']) + self.assertEqual('i can smell fear on you', response.output.item['quip']) + self.assertEqual(130, response.output.item['height']) + + # get items with filter expression + response = await self._client.kv.scan(container=self._container, + table_path=self._path, + filter_expression="feature == 'singing'") + self.assertEqual(1, len(response.output.items)) + + # get items with segment / total_segments + total_segments = 4 + total_items = [] + + for segment in range(total_segments): + received_items = await self._client.kv.new_cursor(container=self._container, + table_path=self._path, + segment=segment, + total_segments=total_segments).all() + total_items.append(received_items) + + self.assertEqual(4, len(total_items)) + + # with limit = 0 + received_items = await self._client.kv.new_cursor(container=self._container, + table_path=self._path, + attribute_names=['age', 'feature'], + filter_expression='age > 15', + limit=0).all() + + self.assertEqual(0, len(received_items)) + + received_items = await self._client.kv.new_cursor(container=self._container, + table_path=self._path, + attribute_names=['age', 'feature'], + filter_expression='age > 15').all() + + self.assertEqual(2, len(received_items)) + for item in received_items: + self.assertLess(15, item['age']) + + # + # Increment age + # + + await self._client.kv.update(container=self._container, + table_path=self._path, + key='louise', + expression='age = age + 1') + + response = await self._client.kv.get(container=self._container, + table_path=self._path, + key='louise', + attribute_names=['age']) + + self.assertEqual(10, response.output.item['age']) + + async def _delete_items(self, path, items): + + # delete items + for item_key, _ in future.utils.viewitems(items): + await self._client.kv.delete(container=self._container, + table_path=path, + key=item_key) + + # delete dir + await self._client.object.delete(container=self._container, + path=path) + + async def _verify_items(self, path, items): + items_cursor = self._client.kv.new_cursor(container=self._container, + table_path=path, + attribute_names=['*']) + + received_items = await items_cursor.all() + + # TODO: verify contents + self.assertEqual(len(items), len(received_items)) + + def _compare_item_values(self, v1, v2): + if isinstance(v1, array.array): + # convert to list + v1 = list(v1) + + if v1 != v2: + self.fail('Values dont match') + + def _compare_item_types(self, v1, v2): + if isinstance(v1, array.array): + # convert to list + v1 = list(v1) + + # can't guarantee strings as they might be converted to unicode + if type(v1) is not str: + self.assertEqual(type(v1), type(v2)) + + +class TestRaiseForStatus(Test): + + def setUp(self): + super(TestRaiseForStatus, self).setUp() + + async def test_always_raise_no_error(self): + # should raise - since the status code is 500 + await self._client.container.list(self._container, + '/', + raise_for_status=v3io.dataplane.transport.RaiseForStatus.always) + + async def test_specific_status_code_match(self): + # should raise - since the status code is 500 + await self._client.container.list(self._container, '/', raise_for_status=[200]) + + async def test_specific_status_code_no_match(self): + # should raise - since the status code is 500 + try: + await self._client.container.list(self._container, '/', raise_for_status=[500]) + except Exception: + return + + self.fail('Expected an exception') + + async def test_never_raise(self): + await self._client.object.get(container=self._container, + path='/non-existing', + raise_for_status=v3io.aio.dataplane.RaiseForStatus.never) + + async def test_default_raise(self): + try: + await self._client.object.get(container=self._container, path='/non-existing') + except Exception: + return + + self.fail('Expected an exception') diff --git a/v3io/aio/__init__.py b/v3io/aio/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/v3io/aio/dataplane/__init__.py b/v3io/aio/dataplane/__init__.py new file mode 100644 index 0000000..d88418a --- /dev/null +++ b/v3io/aio/dataplane/__init__.py @@ -0,0 +1,2 @@ +from .client import Client +from v3io.dataplane.transport import RaiseForStatus diff --git a/v3io/aio/dataplane/client.py b/v3io/aio/dataplane/client.py new file mode 100644 index 0000000..3fc9865 --- /dev/null +++ b/v3io/aio/dataplane/client.py @@ -0,0 +1,98 @@ +import os +import sys +import ujson + +import v3io.dataplane.transport.requests +import v3io.dataplane.transport.httpclient +import v3io.dataplane.request +import v3io.dataplane.batch +import v3io.dataplane.response +import v3io.dataplane.output +import v3io.dataplane.kv_cursor +import v3io.aio.dataplane.transport.aiohttp +import v3io.common.helpers +import v3io.logger + + +class Client(object): + + def __init__(self, + logger=None, + endpoint=None, + access_key=None, + max_connections=None, + timeout=None, + logger_verbosity=None, + transport_verbosity='info'): + """Creates a v3io client, used to access v3io + + Parameters + ---------- + logger (Optional) : logger + An optional pre-existing logger. If not passed, a logger is created with 'logger_verbosity' level + endpoint (Optional) : str + The v3io endpoint to connect to (e.g. http://v3io-webapi:8081). if empty, the env var + V3IO_API is used + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. this can + be overridden per request if needed + max_connections (Optional) : int + The number of connections to create towards v3io - defining the max number of parallel + operations towards v3io. Defaults to 8 + timeout (Optional) : None + For future use + logger_verbosity (Optional) : INFO / DEBUG + If 'logger' is not provided, this will specify the verbosity of the created logger. + transport_verbosity (Optional) : INFO / DEBUG + If set to 'DEBUG', transport will log lots of information at the cost of performance. It uses + the "debug_with" logger interface, so wither a logger set to DEBUG level must be passed in 'logger' or + 'logger_verbosity' must be set to DEBUG + + Return Value + ---------- + A `Client` object + """ + self._logger = logger or self._create_logger(logger_verbosity) + self._access_key = access_key or os.environ.get('V3IO_ACCESS_KEY') + + if not self._access_key: + raise ValueError('Access key must be provided in Client() arguments or in the ' + 'V3IO_ACCESS_KEY environment variable') + + self._transport = v3io.aio.dataplane.transport.aiohttp.Transport(self._logger, + endpoint, + max_connections, + timeout, + transport_verbosity) + + # create models + self.kv, self.object, self.stream, self.container = self._create_models() + + async def close(self): + await self._transport.close() + + def _create_logger(self, logger_verbosity): + logger = v3io.logger.Logger(level=logger_verbosity or 'INFO') + logger.set_handler('stdout', sys.stdout, v3io.logger.HumanReadableFormatter()) + + return logger + + @staticmethod + def _get_schema_contents(key, fields): + return ujson.dumps({ + 'hashingBucketNum': 0, + 'key': key, + 'fields': fields + }) + + def _create_models(self): + import v3io.aio.dataplane.object + import v3io.aio.dataplane.kv + import v3io.aio.dataplane.stream + import v3io.aio.dataplane.container + + # create models + return v3io.aio.dataplane.kv.Model(self), \ + v3io.aio.dataplane.object.Model(self), \ + v3io.aio.dataplane.stream.Model(self), \ + v3io.aio.dataplane.container.Model(self) diff --git a/v3io/aio/dataplane/container.py b/v3io/aio/dataplane/container.py new file mode 100644 index 0000000..e4b0608 --- /dev/null +++ b/v3io/aio/dataplane/container.py @@ -0,0 +1,55 @@ +import v3io.dataplane.request +import v3io.dataplane.output +import v3io.dataplane.model +import v3io.dataplane.kv_cursor + + +class Model(v3io.dataplane.model.Model): + + def __init__(self, client): + self._client = client + self._access_key = client._access_key + self._transport = client._transport + + async def list(self, + container, + path, + access_key=None, + raise_for_status=None, + get_all_attributes=None, + directories_only=None, + limit=None, + marker=None): + """Lists the containers contents. + + Parameters + ---------- + container (Required) : str + The container on which to operate. + path (Required) : str + The path within the container + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + get_all_attributes (Optional) : bool + False (default) - retrieves basic attributes + True - retrieves all attributes of the underlying objects + directories_only (Optional) : bool + False (default) - retrieves objects (contents) and directories (common prefixes) + True - retrieves only directories (common prefixes) + limit (Optional) : int + Number of objects/directories to receive. default: 1000 + marker (Optional) : str + An opaque identifier that was returned in the NextMarker element of a response to a previous + get_container_contents request that did not return all the requested items. This marker identifies the + location in the path from which to start searching for the remaining requested items. + + Return Value + ---------- + A `Response` object, whose `output` is `GetContainerContentsOutput`. + """ + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_get_container_contents, + locals(), + v3io.dataplane.output.GetContainerContentsOutput) diff --git a/v3io/aio/dataplane/kv.py b/v3io/aio/dataplane/kv.py new file mode 100644 index 0000000..56cf57d --- /dev/null +++ b/v3io/aio/dataplane/kv.py @@ -0,0 +1,354 @@ +import os + +import v3io.dataplane.request +import v3io.dataplane.output +import v3io.dataplane.model +import v3io.aio.dataplane.kv_cursor + + +class Model(v3io.dataplane.model.Model): + + def __init__(self, client): + self._client = client + self._access_key = client._access_key + self._transport = client._transport + + def new_cursor(self, + container, + table_path, + access_key=None, + raise_for_status=None, + attribute_names='*', + filter_expression=None, + marker=None, + sharding_key=None, + limit=None, + segment=None, + total_segments=None, + sort_key_range_start=None, + sort_key_range_end=None): + return v3io.aio.dataplane.kv_cursor.Cursor(self._client, + container, + access_key or self._access_key, + table_path, + raise_for_status, + attribute_names, + filter_expression, + marker, + sharding_key, + limit, + segment, + total_segments, + sort_key_range_start, + sort_key_range_end) + + async def put(self, + container, + table_path, + key, + attributes, + access_key=None, + raise_for_status=None, + condition=None): + """Creates an item with the provided attributes. If an item with the same name (primary key) already exists in + the specified table, the existing item is completely overwritten (replaced with a new item). If the item or + table do not exist, the operation creates them. + + See: + https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/nosql-web-api/putitem/ + + Notes: + 1. To provide arrays, pass either a list of integers ([1, 2, 3]), a list of floats ([1.0, 2.0, 3.0]) an + array.array with a typecode of either 'l' (integer) or 'd' (float). The response will always either be + a list of integers or a list of floats (never an array.array) + 2. To provide a timestamp, pass a datetime.datetime. Whatever the timezone, it will be stored as UTC and + a UTC datetime will be retreived when read + + Parameters + ---------- + container (Required) : str + The container on which to operate. + table_path (Required) : str + The full path of the table + key (Required) : str + The item key name + attributes (Required) : dict + The item to add - an object containing zero or more attributes. + For example: + { + 'age': 42, + 'feature': 'mustache' + } + + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + condition (Optional) : str + A Boolean condition expression that defines a conditional logic for executing the put-item operation. + + Return Value + ---------- + A `Response` object. + """ + + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_put_item, + locals()) + + async def update(self, + container, + table_path, + key, + access_key=None, + raise_for_status=None, + attributes=None, + expression=None, + condition=None, + update_mode=None, + alternate_expression=None): + """Updates the attributes of a table item. If the specified item or table don't exist, + the operation creates them. + + See: + https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/nosql-web-api/updateitem/ + + Parameters + ---------- + container (Required) : str + The container on which to operate. + table_path (Required) : str + The full path of the table + key (Required) : str + The item key name + attributes (Required) : dict + The item to update - an object containing zero or more attributes. + For example: + { + 'age': 42, + 'feature': 'mustache' + } + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + expression (Optional) : str + An update expression that specifies the changes to make to the item's attributes. + condition (Optional) : str + A Boolean condition expression that defines a conditional logic for executing the put-item operation. + See https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/nosql-web-api/putitem/ + update_mode (Optional) : str + CreateOrReplaceAttributes (default): Creates or replaces attributes + alternate_expression (Optional) : str + An alternate update expression that specifies the changes to make to the item's attributes when a + condition expression, defined in the ConditionExpression request parameter, evaluates to false; + (i.e., this parameter defines the else clause of a conditional if-then-else update expression). + See Update Expression for syntax details and examples. When the alternate update expression is executed, + it's evaluated against the table item to be updated, if it exists. If the item doesn't exist, the update + creates it (as well as the parent table if it doesn't exist). See also the UpdateExpression notes, which + apply to the alternate update expression as well. + + Return Value + ---------- + A `Responses` object. + """ + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_update_item, + locals()) + + async def get(self, + container, + table_path, + key, + access_key=None, + raise_for_status=None, + attribute_names='*'): + """Retrieves the requested attributes of a table item. + + See: + https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/nosql-web-api/getitem/ + + Parameters + ---------- + container (Required) : str + The container on which to operate. + table_path (Required) : str + The full path of the table + key (Required) : str + The item key name + attribute_names (Required) : []str or '*' + A list of attribute names to get, or '*' which will retreive all attributes + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + + Return Value + ---------- + A `Response` object, whose `output` is `GetItemOutput`. + """ + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_get_item, + locals(), + v3io.dataplane.output.GetItemOutput) + + async def scan(self, + container, + table_path, + access_key=None, + raise_for_status=None, + attribute_names='*', + filter_expression=None, + marker=None, + sharding_key=None, + limit=None, + segment=None, + total_segments=None, + sort_key_range_start=None, + sort_key_range_end=None): + """Retrieves (reads) attributes of multiple items in a table or in a data container's root directory, + according to the specified criteria. + + See: + https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/nosql-web-api/getitems/ + + Parameters + ---------- + container (Required) : str + The container on which to operate. + table_path (Required) : str + The full path of the table + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + attribute_names (Optional) : []str or '*' + A list of attribute names to get, or '*' which will retreive all attributes + filter_expression (Optional) : str + A filter expression that restricts the items to retrieve. Only items that match the filter criteria + are returned. See https://www.iguazio.com/docs/reference/latest-release/expressions/condition-expression/#filter-expression.md + marker (Optional) : str + An opaque identifier that was returned in the NextMarker element of a response to a previous GetItems + request that did not return all the requested items. This marker identifies the location in the table + from which to start searching for the remaining requested items. See Partial Response and the description + of the NextMarker response element. + sharding_key (Optional) : str + The maximal sorting-key value of the items to get by using a range scan. The sorting-key value is the + part to the right of the leftmost period in a compound primary-key value (item name). This parameter is + applicable only together with the ShardingKey request parameter. The scan will return all items with the + specified sharding-key value whose sorting-key values are greater than or equal to (>=) than the value of + the SortKeyRangeStart parameter (if set) and less than (<) the value of the SortKeyRangeEnd parameter. + limit (Optional) : int + The maximum number of items to return within the response (i.e., the maximum number of elements in the + response object's Items array). + segment (Optional) : str + The ID of a specific table segment to scan - 0 to one less than TotalSegment + total_segments (Optional) : str + The number of segments into which to divide the table scan - 1 to 1024. See Parallel Scan. + The segments are assigned sequential IDs starting with 0. + sort_key_range_start (Optional) : str + The minimal sorting-key value of the items to get by using a range scan. The sorting-key value is the part + to the right of the leftmost period in a compound primary-key value (item name). This parameter is + applicable only together with the ShardingKey request parameter. The scan will return all items with + the specified sharding-key value whose sorting-key values are greater than or equal to (>=) the value of + the SortKeyRangeStart parameter and less than (<) the value of the SortKeyRangeEnd parameter (if set). + sort_key_range_end (Optional) : str + The maximal sorting-key value of the items to get by using a range scan. The sorting-key value is the part + to the right of the leftmost period in a compound primary-key value (item name). This parameter is + applicable only together with the ShardingKey request parameter. The scan will return all items with + the specified sharding-key value whose sorting-key values are greater than or equal to (>=) than the + value of the SortKeyRangeStart parameter (if set) and less than (<) the value of the SortKeyRangeEnd + parameter. + + Return Value + ---------- + A `Response` object, whose `output` is `GetItemsOutput`. + """ + table_path = self._ensure_path_ends_with_slash(table_path) + + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_get_items, + locals(), + v3io.dataplane.output.GetItemsOutput) + + async def delete(self, container, table_path, key, access_key=None, raise_for_status=None, transport_actions=None): + """Deletes an item. + + Parameters + ---------- + container (Required) : str + The container on which to operate. + table_path (Required) : str + The full path of the table + key (Required) : str + The item key name + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + + Return Value + ---------- + A `Response` object. + """ + return self._client.delete_object(container, + os.path.join(table_path, key), + access_key, + raise_for_status, + transport_actions) + + async def create_schema(self, + container, + table_path, + access_key=None, + raise_for_status=None, + key=None, + fields=None): + """Creates a KV schema file + + DEPRECATED. Use kv.create_schema + + Parameters + ---------- + container (Required) : str + The container on which to operate. + table_path (Required) : str + The full path of the table + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + key (Required) : str + The key field name + fields (Required) : list of dicts + A dictionary of fields, where each item has: + - name (string) + - type (string - one of string, double, long) + - nullable (boolean) + + Example: [ + { + 'name': 'my_field', + 'type': 'string', + 'nullable': False + }, + { + 'name': 'who', + 'type': 'string', + "nullable": True + } + ] + + Return Value + ---------- + A `Response` object + """ + put_object_args = locals() + put_object_args['path'] = os.path.join(put_object_args['table_path'], '.%23schema') + put_object_args['offset'] = 0 + put_object_args['append'] = None + put_object_args['body'] = self._client._get_schema_contents(key, fields) + del (put_object_args['key']) + del (put_object_args['fields']) + + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_put_object, + put_object_args) diff --git a/v3io/aio/dataplane/kv_cursor.py b/v3io/aio/dataplane/kv_cursor.py new file mode 100644 index 0000000..4eae6e2 --- /dev/null +++ b/v3io/aio/dataplane/kv_cursor.py @@ -0,0 +1,87 @@ +class Cursor(object): + + def __init__(self, + context, + container_name, + access_key, + table_path, + raise_for_status=None, + attribute_names='*', + filter_expression=None, + marker=None, + sharding_key=None, + limit=None, + segment=None, + total_segments=None, + sort_key_range_start=None, + sort_key_range_end=None): + self._context = context + self._container_name = container_name + self._access_key = access_key + self._current_response = None + self._current_items = None + self._current_item = None + self._current_item_index = 0 + + # get items params + self.raise_for_status = raise_for_status + self.table_path = table_path + self.attribute_names = attribute_names + self.filter_expression = filter_expression + self.marker = marker + self.sharding_key = sharding_key + self.limit = limit + self.segment = segment + self.total_segments = total_segments + self.sort_key_range_start = sort_key_range_start + self.sort_key_range_end = sort_key_range_end + + async def next_item(self): + if self._current_item_index < len(self._current_items or []): + self._current_item = self._current_items[self._current_item_index] + self._current_item_index += 1 + + return self._current_item + + if self._current_response and (self._current_response.output.last or len(self._current_items) == 0): + return None + + self.marker = self._current_response.output.next_marker if self._current_response else None + + # get the next batch + self._current_response = await self._context.kv.scan(self._container_name, + self.table_path, + self._access_key, + self.raise_for_status, + self.attribute_names, + self.filter_expression, + self.marker, + self.sharding_key, + self.limit, + self.segment, + self.total_segments, + self.sort_key_range_start, + self.sort_key_range_end) + + # raise if there was an issue + self._current_response.raise_for_status() + + # set items + self._current_items = self._current_response.output.items + self._current_item_index = 0 + + # and recurse into next now that we repopulated response + return await self.next_item() + + async def all(self): + items = [] + + while True: + item = await self.next_item() + + if item is None: + break + + items.append(item) + + return items diff --git a/v3io/aio/dataplane/object.py b/v3io/aio/dataplane/object.py new file mode 100644 index 0000000..c61d881 --- /dev/null +++ b/v3io/aio/dataplane/object.py @@ -0,0 +1,99 @@ +import v3io.dataplane.request +import v3io.dataplane.output +import v3io.dataplane.model +import v3io.dataplane.kv_cursor + + +class Model(v3io.dataplane.model.Model): + + def __init__(self, client): + self._client = client + self._access_key = client._access_key + self._transport = client._transport + + async def get(self, + container, + path, + access_key=None, + raise_for_status=None, + offset=None, + num_bytes=None): + """Retrieves an object from a container. + + Parameters + ---------- + container (Required) : str + The container on which to operate. + path (Required) : str + The path of the object + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + offset (Optional) : int + A numeric offset into the object (in bytes). Defaults to 0 + num_bytes (Optional) : int + Number of bytes to return. By default equal to len(object)-offset + + Return Value + ---------- + A `Response` object, whose `body` is populated with the body of the object. + """ + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_get_object, + locals()) + + async def put(self, + container, + path, + access_key=None, + raise_for_status=None, + body=None, + append=None): + """Adds a new object to a container, or appends data to an existing object. The option to append data is + extension to the S3 PUT Object capabilities + + Parameters + ---------- + container (Required) : str + The container on which to operate. + path (Required) : str + The path of the object + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + body (Optional) : str + The contents of the object + append (Optional) : bool + If True, the put appends the data to the end of the object. Defaults to False + + Return Value + ---------- + A `Response` object + """ + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_put_object, + locals()) + + async def delete(self, container, path, access_key=None, raise_for_status=None): + """Deletes an object from a container. + + Parameters + ---------- + container (Required) : str + The container on which to operate. + path (Required) : str + The path of the object + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + + Return Value + ---------- + A `Response` object. + """ + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_delete_object, + locals()) diff --git a/v3io/aio/dataplane/stream.py b/v3io/aio/dataplane/stream.py new file mode 100644 index 0000000..9784ad4 --- /dev/null +++ b/v3io/aio/dataplane/stream.py @@ -0,0 +1,320 @@ +import os + +import v3io.dataplane.request +import v3io.dataplane.output +import v3io.dataplane.model +import v3io.dataplane.kv_cursor + + +class Model(v3io.dataplane.model.Model): + + def __init__(self, client): + self._client = client + self._access_key = client._access_key + self._transport = client._transport + + async def create(self, + container, + stream_path, + shard_count, + access_key=None, + raise_for_status=None, + retention_period_hours=None): + """Creates and configures a new stream. The configuration includes the stream's shard count and retention + period. The new stream is available immediately upon its creation. + + See: + https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/streaming-web-api/createstream/ + + Parameters + ---------- + container (Required) : str + The container on which to operate. + stream_path (Required) : str + A unique name for the new stream (collection) that will be created. + shard_count (Required) : int + The steam's shard count, i.e., the number of stream shards to create. + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + retention_period_hours (Optional) : int + The stream's retention period, in hours. After this period elapses, when new records are added to the + stream, the earliest ingested records are deleted. default: 24 + + Return Value + ---------- + A `Response` object. + """ + stream_path = self._ensure_path_ends_with_slash(stream_path) + + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_create_stream, + locals()) + + async def update(self, + container, + stream_path, + shard_count, + access_key=None, + raise_for_status=None): + """Updates a stream's configuration by increasing its shard count. The changes are applied immediately. + + See: + https://www.iguazio.com/docs/latest-release/reference/api-reference/web-apis/streaming-web-api/updatestream/ + + Parameters + ---------- + container (Required) : str + The container on which to operate. + stream_path (Required) : str + A unique name for the new stream (collection) that will be created. + shard_count (Required) : int + The steam's shard count, i.e., the number of stream shards to create. + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + + Return Value + ---------- + A `Response` object. + """ + stream_path = self._ensure_path_ends_with_slash(stream_path) + + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_update_stream, + locals()) + + async def delete(self, container, stream_path, access_key=None, raise_for_status=None): + """Deletes a stream object along with all of its shards. + + Parameters + ---------- + container (Required) : str + The container on which to operate. + stream_path (Required) : str + The stream_path of the stream. + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + + Return Value + ---------- + A `Response` object. + """ + stream_path = self._ensure_path_ends_with_slash(stream_path) + + response = await self._client.container.list(container, + stream_path, + access_key, + raise_for_status) + + # nothing to do + if response.status_code == 404: + return response + + for stream_shard in response.output.contents: + await self._client.object.delete(container, stream_shard.key, access_key, raise_for_status) + + return await self._client.object.delete(container, stream_path, access_key, raise_for_status) + + async def describe(self, container, stream_path, access_key=None, raise_for_status=None): + """Retrieves a stream's configuration, including the shard count and retention period. + + See: + https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/streaming-web-api/describestream/ + + Parameters + ---------- + stream_path (Required) : str + The stream_path of the stream. + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + + Return Value + ---------- + A `Response` object, whose `output` is `DescribeStreamOutput`. + """ + stream_path = self._ensure_path_ends_with_slash(stream_path) + + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_describe_stream, + locals(), + v3io.dataplane.output.DescribeStreamOutput) + + async def seek(self, + container, + stream_path, + shard_id, + seek_type, + access_key=None, + raise_for_status=None, + starting_sequence_number=None, + timestamp_sec=None, + timestamp_nsec=None): + """Returns the requested location within the specified stream shard, for use in a subsequent GetRecords + operation. The operation supports different seek types, as outlined in the Stream Record Consumption + overview and in the description of the Type request parameter below. + + See: + https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/streaming-web-api/seek/ + + Parameters + ---------- + container (Required) : str + The container on which to operate. + stream_path (Required) : str + The stream_path of the stream. + seek_type (Required) : str + 'EARLIEST': the location of the earliest ingested record in the shard. + 'LATEST': the location of the end of the shard. + 'TIME': the location of the earliest ingested record in the shard beginning at the base time set in the + TimestampSec and TimestampNSec request parameters. If no matching record is found (i.e., if all + records in the shard arrived before the specified base time) the operation returns the location + of the end of the shard. + 'SEQUENCE': the location of the record whose sequence number matches the sequence number specified in the + StartingSequenceNumber request parameter. If no match is found, the operation fails. + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + starting_sequence_number (Optional) : int + Record sequence number for a sequence-number based seek operation - Type=SEQUENCE. When this parameter is + set, the operation returns the location of the record whose sequence number matches the parameter. + timestamp_sec (Optional) : int + The base time for a time-based seek operation (Type=TIME), as a Unix timestamp in seconds. For example, + 1511260205 sets the search base time to 21 Nov 2017 at 10:30:05 AM UTC. The TimestampNSec request parameter + sets the nanoseconds unit of the seek base time. + + When the TimestampSec and TimestampNSec parameters are set, the operation searches for the location of the + earliest ingested record in the shard (the earliest record that arrived at the platform) beginning at the + specified base time. If no matching record is found (i.e., if all records in the shard arrived before the + specified base time), return the last location in the shard. + timestamp_nsec (Optional) : int + The nanoseconds unit of the TimestampSec base-time timestamp for a time-based seek operation (Type=TIME). + For example, if TimestampSec is 1511260205 and TimestampNSec is 500000000, seek should search for the + earliest ingested record since 21 Nov 2017 at 10:30 AM and 5.5 seconds. + + Return Value + ---------- + A `Response` object, whose `output` is `SeekShardOutput`. + """ + stream_path = self._ensure_path_ends_with_slash(os.path.join(stream_path, str(shard_id))) + + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_seek_shard, + locals(), + v3io.dataplane.output.SeekShardOutput) + + async def put_records(self, + container, + stream_path, + records, + access_key=None, + raise_for_status=None): + """Adds records to a stream. + + You can optionally assign a record to specific stream shard by specifying a related shard ID, or associate + the record with a specific partition key to ensure that similar records are assigned to the same shard. + By default, the platform assigns records to shards using a Round Robin algorithm. The max number of records + is 1000. + + See: + https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/streaming-web-api/putrecords/ + + Parameters + ---------- + container (Required) : str + The container on which to operate. + stream_path (Required) : str + The stream_path of the stream. + records (Required) : []dict + A list of dictionaries with the following keys: + - shard_id: int, optional + The ID of the shard to which to assign the record, as an integer between 0 and one less than + the stream's shard count. When both ShardId and PartitionKey are set, the record is assigned + according to the shard ID, and PartitionKey is ignored. When neither a Shard ID or a partition + key is provided in the request, the platform's default shard-assignment algorithm is used. + - data: str, required + Record data. + - client_info: bytes/bytearray, optional + Custom opaque information that can optionally be provided by the producer. + This metadata can be used, for example, to save the data format of a record, or the time at + which a sensor or application event was triggered. + - partition_key: str, optional + A partition key with which to associate the record (see Record Metadata). Records with the + same partition key are assigned to the same shard, subject to the following exceptions: if + a shard ID is also provided for the record (see the Records ShardId request parameter), + the record is assigned according to the shard ID, and PartitionKey is ignored. In addition, + if you increase a stream's shard count after its creation (see UpdateStream), new records + with a previously used partition key will be assigned either to the same shard that was + previously used for this partition key or to a new shard. All records with the same + partition key that are added to the stream after the shard-count change will be assigned + to the same shard (be it the previously used shard or a new shard). When neither a Shard + ID or a partition key is provided in the request, the platform's default shard-assignment + algorithm is used + + For example: + [ + {'shard_id': 1, 'data': 'first shard record #1'}, + {'shard_id': 1, 'data': 'first shard record #2'}, + {'shard_id': 10, 'data': 'invalid shard record #1'}, + {'shard_id': 2, 'data': 'second shard record #1'}, + {'data': 'some shard record #1'}, + ] + + Return Value + ---------- + A `Response` object, whose `output` is `PutRecordsOutput`. + """ + stream_path = self._ensure_path_ends_with_slash(stream_path) + + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_put_records, + locals(), + v3io.dataplane.output.PutRecordsOutput) + + async def get_records(self, + container, + stream_path, + shard_id, + location, + access_key=None, + raise_for_status=None, + limit=None): + """Retrieves (consumes) records from a stream shard. + + See: + https://www.iguazio.com/docs/reference/latest-release/api-reference/web-apis/streaming-web-api/getrecords/ + + Parameters + ---------- + container (Required) : str + The container on which to operate. + stream_path (Required) : str + The stream_path of the stream + location (Required) : str + The location within the shard at which to begin consuming records. + access_key (Optional) : str + The access key with which to authenticate. Defaults to the V3IO_ACCESS_KEY env. + limit (Optional) : int + The maximum number of records to return in the response. The minimum is 1. There's no restriction on + the amount of returned records, but the maximum supported overall size of all the returned records is + 10 MB and the maximum size of a single record is 2 MB, so calculate the limit accordingly. + + Return Value + ---------- + A `Response` object, whose `output` is `GetRecordsOutput`. + """ + stream_path = self._ensure_path_ends_with_slash(os.path.join(stream_path, str(shard_id))) + + return await self._transport.request(container, + access_key or self._access_key, + raise_for_status, + v3io.dataplane.request.encode_get_records, + locals(), + v3io.dataplane.output.GetRecordsOutput) diff --git a/v3io/aio/dataplane/transport/__init__.py b/v3io/aio/dataplane/transport/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/v3io/aio/dataplane/transport/aiohttp.py b/v3io/aio/dataplane/transport/aiohttp.py new file mode 100644 index 0000000..ef232c4 --- /dev/null +++ b/v3io/aio/dataplane/transport/aiohttp.py @@ -0,0 +1,95 @@ +import os +import aiohttp + +import v3io.dataplane.request +import v3io.dataplane.response +import v3io.dataplane.transport + + +class Transport(object): + + def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, verbosity=None): + self._logger = logger + self._endpoint = self._get_endpoint(endpoint) + self._timeout = timeout + self.max_connections = max_connections or 8 + # self._connector = None + # self._client_session = None + self._connector = aiohttp.TCPConnector() + self._client_session = aiohttp.ClientSession(connector=self._connector) + + self._set_log_method(verbosity) + + async def close(self): + await self._client_session.close() + await self._connector.close() + + async def request(self, + container, + access_key, + raise_for_status, + encoder, + encoder_args, + output=None): + + # allocate a request + request = v3io.dataplane.request.Request(container, + access_key, + raise_for_status, + encoder, + encoder_args, + output) + + self.log('Tx', method=request.method, path=request.path, headers=request.headers, body=request.body) + + # call the encoder to get the response + async with self._client_session.request(request.method, + self._endpoint + '/' + request.path, + headers=request.headers, + data=request.body, + ssl=False) as http_response: + + # get contents + contents = await http_response.content.read() + + # create a response + response = v3io.dataplane.response.Response(output, + http_response.status, + http_response.headers, + contents) + + # enforce raise for status + response.raise_for_status(request.raise_for_status or raise_for_status) + + self.log('Rx', status_code=response.status_code, headers=response.headers, body=contents) + + return response + + @staticmethod + def _get_endpoint(endpoint): + + if endpoint is None: + endpoint = os.environ.get('V3IO_API') + + if endpoint is None: + raise RuntimeError('Endpoints must be passed to context or specified in V3IO_API') + + if not endpoint.startswith('http://') and not endpoint.startswith('https://'): + endpoint = 'http://' + endpoint + + return endpoint.rstrip('/') + + def _set_log_method(self, verbosity): + # by default, the log method is null + log_method = self._log_null + + if verbosity == 'DEBUG': + log_method = self._log + + setattr(self, 'log', log_method) + + def _log(self, message, *args, **kw_args): + self._logger.debug_with(message, *args, **kw_args) + + def _log_null(self, message, *args, **kw_args): + pass diff --git a/v3io/dataplane/stream.py b/v3io/dataplane/stream.py index 24f0405..bbe04a4 100644 --- a/v3io/dataplane/stream.py +++ b/v3io/dataplane/stream.py @@ -108,10 +108,10 @@ def delete(self, container, stream_path, access_key=None, raise_for_status=None) """ stream_path = self._ensure_path_ends_with_slash(stream_path) - response = self._client.get_container_contents(container, - stream_path, - access_key, - raise_for_status) + response = self._client.container.list(container, + stream_path, + access_key, + raise_for_status) # nothing to do if response.status_code == 404: