Skip to content

Commit

Permalink
Merge pull request #67 from timostrunk/closable_store_proposal
Browse files Browse the repository at this point in the history
Extended KeyValueStore and Decorator Baseclasses with closing support
  • Loading branch information
xhochy authored Oct 31, 2022
2 parents 01f8895 + 6e22075 commit cff85c0
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 36 deletions.
7 changes: 7 additions & 0 deletions docs/changes.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
*********

1.5.0
=====

* Added concept for closable stores.
* Stores and Decorators can now be opened using with KeyValueStore as store
* Implemented this functionality for baseclasses and the AzureBlockBlobStore

1.4.4
=====

Expand Down
30 changes: 29 additions & 1 deletion minimalkv/_key_value_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from io import BytesIO
from typing import IO, Iterable, Iterator, List, Optional, Union
from types import TracebackType
from typing import IO, Iterable, Iterator, List, Optional, Type, Union

from minimalkv._constants import VALID_KEY_RE
from minimalkv._mixins import UrlMixin
Expand Down Expand Up @@ -434,6 +435,33 @@ def _put_filename(self, key: str, filename: str) -> str:
with open(filename, "rb") as source:
return self._put_file(key, source)

def close(self):
"""Clean up all open resources in child classes.
Specific store implementations might require teardown methods
(dangling ports, unclosed files). This allows calling close also
for stores, which do not require this.
"""
return

def __enter__(self):
"""Support with clause for automatic calling of close."""
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
):
"""Support with clause for automatic calling of close.
:param exc_type: Type of optional exception encountered in context manager
:param exc_val: Actual optional exception encountered in context manager
:param exc_tb: Traceback of optional exception encountered in context manager
"""
self.close()


class UrlKeyValueStore(UrlMixin, KeyValueStore):
"""Class is deprecated. Use the :class:`.UrlMixin` instead.
Expand Down
25 changes: 24 additions & 1 deletion minimalkv/decorator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Iterable
from types import TracebackType
from typing import Iterable, Optional, Type
from urllib.parse import quote_plus, unquote_plus

from minimalkv._key_value_store import KeyValueStore
Expand Down Expand Up @@ -38,6 +39,28 @@ def __contains__(self, key: str) -> bool: # noqa D
def __iter__(self) -> Iterable[str]: # noqa D
return self._dstore.__iter__()

def close(self):
"""Relay a close call to the next decorator or underlying store."""
self._dstore.close()

def __enter__(self):
"""Provide context manager support."""
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
):
"""Call close on underlying store or decorator.
:param exc_type: Type of optional exception encountered in context manager
:param exc_val: Actual optional exception encountered in context manager
:param exc_tb: Traceback of optional exception encountered in context manager
"""
self.close()


class KeyTransformingDecorator(StoreDecorator): # noqa D
# TODO Document KeyTransformingDecorator.
Expand Down
29 changes: 23 additions & 6 deletions minimalkv/net/_azurestore_new.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Implement the AzureBlockBlobStore for `azure-storage-blob~=12`."""
import io
from contextlib import contextmanager
from typing import Optional

from minimalkv._key_value_store import KeyValueStore
from minimalkv.net._azurestore_common import _byte_buffer_md5, _file_md5
Expand Down Expand Up @@ -36,6 +37,8 @@ def __init__(
checksum=False,
socket_timeout=None,
):
from azure.storage.blob import BlobServiceClient, ContainerClient

# Note that socket_timeout is unused; it only exist for backward compatibility.
# TODO: Docstring
self.conn_string = conn_string
Expand All @@ -46,6 +49,8 @@ def __init__(
self.max_block_size = max_block_size
self.max_single_put_size = max_single_put_size
self.checksum = checksum
self._service_client: Optional[BlobServiceClient] = None
self._container_client: Optional[ContainerClient] = None

# Using @lazy_property will (re-)create block_blob_service instance needed.
# Together with the __getstate__ implementation below, this allows
Expand All @@ -62,16 +67,27 @@ def blob_container_client(self): # noqa D
if self.max_block_size:
kwargs["max_block_size"] = self.max_block_size

service_client = BlobServiceClient.from_connection_string(
self._service_client = BlobServiceClient.from_connection_string(
self.conn_string, **kwargs
)
container_client = service_client.get_container_client(self.container)
self._container_client = self._service_client.get_container_client(
self.container
)
if self.create_if_missing:
with map_azure_exceptions(error_codes_pass=("ContainerAlreadyExists")):
container_client.create_container(
self._container_client.create_container(
public_access="container" if self.public else None
)
return container_client
return self._container_client

def close(self):
"""Close container_client and service_client ports, if opened."""
if self._container_client is not None:
self._container_client.close()
self._container_client = None
if self._service_client is not None:
self._service_client.close()
self._service_client = None

def _delete(self, key: str) -> None:
with map_azure_exceptions(key, error_codes_pass=("BlobNotFound",)):
Expand Down Expand Up @@ -163,11 +179,12 @@ def _get_file(self, key, file):
downloader.readinto(file)

def __getstate__(self): # noqa D
# keep all of __dict__, except lazy properties:
# keep all of __dict__, except lazy properties and properties, which need to be reopened:
dont_pickle = {"_service_client", "_container_client"}
return {
key: value
for key, value in self.__dict__.items()
if not key.startswith(LAZY_PROPERTY_ATTR_PREFIX)
if not key.startswith(LAZY_PROPERTY_ATTR_PREFIX) and key not in dont_pickle
}


Expand Down
6 changes: 4 additions & 2 deletions tests/basic_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ def test_put_opened_file(self, store, key, value, request):
tmp.write(value)
tmp.flush()

store.put_file(key, open(tmp.name, "rb"))
with open(tmp.name, "rb") as infile:
store.put_file(key, infile)

assert store.get(key) == value

Expand All @@ -156,7 +157,8 @@ def test_get_into_file(self, store, key, value, tmp_path):

store.get_file(key, out_filename)

assert open(out_filename, "rb").read() == value
with open(out_filename, "rb") as infile:
assert infile.read() == value

def test_get_into_stream(self, store, key, value):
store.put(key, value)
Expand Down
68 changes: 61 additions & 7 deletions tests/test_azure_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def _delete_container(conn_string, container):
# ignore the ContainerNotFound error:
if ex.error_code != "ContainerNotFound":
raise
s.close()
except ImportError:
# for azure-storage-blob<12
from azure.storage.blob import BlockBlobService
Expand All @@ -68,9 +69,10 @@ class TestAzureStorage(BasicStore, OpenSeekTellStore):
def store(self):
container = str(uuid())
conn_string = get_azure_conn_string()
yield AzureBlockBlobStore(
with AzureBlockBlobStore(
conn_string=conn_string, container=container, public=False
)
) as store:
yield store
_delete_container(conn_string, container)


Expand All @@ -87,23 +89,69 @@ def store(self):
class ExtendedKeysStore(ExtendedKeyspaceMixin, AzureBlockBlobStore):
pass

yield ExtendedKeysStore(
with ExtendedKeysStore(
conn_string=conn_string, container=container, public=False
)
) as store:
yield store
_delete_container(conn_string, container)


@pytest.mark.filterwarnings("error")
def test_azure_dangling_port_enter_exit():
container = str(uuid())
conn_string = get_azure_conn_string()
with AzureBlockBlobStore(conn_string=conn_string, container=container) as store:
if not hasattr(store, "blob_container_client"):
# This test only runs for azurestore_new
return
store.blob_container_client # type: ignore


@pytest.mark.filterwarnings("error")
def test_azure_dangling_port_explicit_close():
container = str(uuid())
conn_string = get_azure_conn_string()
store = AzureBlockBlobStore(conn_string=conn_string, container=container)
if not hasattr(store, "blob_container_client"):
# This test only runs for azurestore_new
return
store.blob_container_client # type: ignore
store.close()


@pytest.mark.filterwarnings("error")
def test_azure_dangling_port_explicit_close_multi():
container = str(uuid())
conn_string = get_azure_conn_string()
store = AzureBlockBlobStore(conn_string=conn_string, container=container)
if not hasattr(store, "blob_container_client"):
# This test only runs for azurestore_new
return
store.blob_container_client # type: ignore
# We check that multiclose and reuse do not do weird things
store.close()
store.close()
store.blob_container_client # type: ignore
store.blob_container_client # type: ignore
store.blob_container_client # type: ignore
store.close()
store.close()
store.close()


def test_azure_setgetstate():
container = str(uuid())
conn_string = get_azure_conn_string()
store = AzureBlockBlobStore(conn_string=conn_string, container=container)
store.put("key1", b"value1")

buf = pickle.dumps(store, protocol=2)
store.close()
store = pickle.loads(buf)

assert store.get("key1") == b"value1"
_delete_container(conn_string, container)
store.close()


def test_azure_store_attributes():
Expand Down Expand Up @@ -144,6 +192,7 @@ def test_azure_special_args():
cfg = abbs.blob_container_client._config # type: ignore
assert cfg.max_single_put_size == MSP
assert cfg.max_block_size == MBS
abbs.close()


class TestAzureExceptionHandling:
Expand All @@ -156,6 +205,7 @@ def test_missing_container(self):
with pytest.raises(IOError) as exc:
store.keys()
assert "The specified container does not exist." in str(exc.value)
store.close()

def test_wrong_endpoint(self):
container = str(uuid())
Expand All @@ -176,6 +226,7 @@ def test_wrong_endpoint(self):
with pytest.raises(IOError) as exc:
store.put("key", b"data")
assert "connect" in str(exc.value)
store.close()

def test_wrong_credentials(self):
container = str(uuid())
Expand All @@ -198,6 +249,7 @@ def test_wrong_credentials(self):
with pytest.raises(IOError) as exc:
store.put("key", b"data")
assert "Incorrect padding" in str(exc.value)
store.close()


class TestChecksum:
Expand All @@ -210,12 +262,13 @@ def store(self):
container = str(uuid())
conn_string = get_azure_conn_string()

yield AzureBlockBlobStore(
with AzureBlockBlobStore(
conn_string=conn_string,
container=container,
public=False,
checksum=True,
)
) as store:
yield store
_delete_container(conn_string, container)

def _checksum(self, store):
Expand All @@ -241,7 +294,8 @@ def test_checksum_put(self, store):
def test_checksum_put_file(self, store, tmpdir):
file_ = tmpdir.join("my_file")
file_.write(self.CONTENT)
store.put_file(self.KEY, file_.open("rb"))
with file_.open("rb") as infile:
store.put_file(self.KEY, infile)
assert self._checksum(store) == self.EXPECTED_CHECKSUM
assert store.get(self.KEY) == self.CONTENT

Expand Down
Loading

0 comments on commit cff85c0

Please sign in to comment.