Skip to content

Commit

Permalink
compressor: replace snappy with cramjam
Browse files Browse the repository at this point in the history
The python-snappy library is no longer maintained. Cramjam is almost a
drop-in replacement that allows for a considerable speedup both in the
compression and decompression tasks. The relevant benchmark for
snappy's framed format [1].

Add more tests for compression and decompression & modernize some pattern matching.

[1]: https://github.com/milesgranger/pyrus-cramjam/blob/33c0516374fb9726ddcb82c5dfbe86be96d2cd35/cramjam-python/benchmarks/README.md
  • Loading branch information
aris-aiven committed Aug 23, 2023
1 parent 88e5ba1 commit 7bcbaf7
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 100 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ jobs:

- id: dependencies
run: |
sudo apt-get install -y libsnappy-dev
pip install -r requirements.txt
pip install -r requirements.dev.txt
Expand Down
4 changes: 4 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
[MAIN]

extension-pkg-allow-list=cramjam

[MESSAGES CONTROL]
disable=
bad-option-value,
Expand Down
6 changes: 4 additions & 2 deletions make_release.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ def make_release(version: str) -> None:
version_filename.write_text(f'VERSION = "{version}"\n')
subprocess.run(["git", "-C", str(project_directory), "add", str(version_filename)], check=True)
subprocess.run(["git", "-C", str(project_directory), "commit", "-m", f"Bump to version {version}"], check=True)
subprocess.run(["git", "-C", str(project_directory), "tag", "-a", f"releases/{version}", "-m", f"Version {version}"], check=True)
subprocess.run(
["git", "-C", str(project_directory), "tag", "-a", f"releases/{version}", "-m", f"Version {version}"], check=True
)
subprocess.run(["git", "-C", str(project_directory), "log", "-n", "1", "-p"], check=True)
print("Run 'git push --tags' to confirm the release")


if __name__ == '__main__':
if __name__ == "__main__":
parser = argparse.ArgumentParser("Make a rohmu release")
parser.add_argument("version")
args = parser.parse_args()
Expand Down
7 changes: 3 additions & 4 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ ignore_missing_imports = True
[mypy-azure.common.*]
ignore_missing_imports = True

[mypy-dataclasses.*]
[mypy-cramjam.*]
ignore_missing_imports = True


[mypy-oauth2client.*]
[mypy-dataclasses.*]
ignore_missing_imports = True

[mypy-snappy.*]
[mypy-oauth2client.*]
ignore_missing_imports = True

[mypy-swiftclient.*]
Expand Down
3 changes: 2 additions & 1 deletion rohmu.spec
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ Requires: python3-cryptography >= 1.6
Requires: python3-dateutil
Requires: python3-pydantic
Requires: python3-requests
Requires: python3-snappy
# Requires: python3-snappy
# TODO: Create python3-cramjam
Requires: python3-zstandard
BuildRequires: python3-devel
BuildRequires: python3-flake8
Expand Down
106 changes: 54 additions & 52 deletions rohmu/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,56 @@
from .errors import InvalidConfigurationError
from .filewrap import Sink, Stream
from .snappyfile import SnappyFile
from .typing import BinaryData, Compressor, Decompressor, FileLike, HasRead, HasWrite
from .typing import Algorithm, BinaryData, Compressor, Decompressor, FileLike, HasRead, HasWrite
from .zstdfile import open as zstd_open
from typing import cast, IO

import lzma

try:
import snappy
import cramjam
except ImportError:
snappy = None # type: ignore
cramjam = None # type: ignore

try:
import zstandard as zstd
except ImportError:
zstd = None # type: ignore


def CompressionFile(dst_fp: FileLike, algorithm: str, level: int = 0, threads: int = 0) -> FileLike:
def CompressionFile(dst_fp: FileLike, algorithm: Algorithm, level: int = 0, threads: int = 0) -> FileLike:
"""This looks like a class to users, but is actually a function that instantiates a class based on algorithm."""
if algorithm == "lzma":
return lzma.open(cast(IO[bytes], dst_fp), "w", preset=level)

if algorithm == "snappy":
return SnappyFile(dst_fp, "wb")

if algorithm == "zstd":
return zstd_open(dst_fp, "wb", level=level, threads=threads)

if algorithm:
raise InvalidConfigurationError("invalid compression algorithm: {!r}".format(algorithm))

return dst_fp
match algorithm:
case "lzma":
compression_fileobj = lzma.open(cast(IO[bytes], dst_fp), "w", preset=level)
case "snappy":
compression_fileobj = SnappyFile(dst_fp, "wb")
case "zstd":
compression_fileobj = zstd_open(dst_fp, "wb", level=level, threads=threads)
case _:
raise InvalidConfigurationError("invalid compression algorithm: {!r}".format(algorithm))
return compression_fileobj


def create_streaming_compressor(algorithm: Algorithm, level: int = 0) -> Compressor:
match algorithm:
case "lzma":
compressor = lzma.LZMACompressor(lzma.FORMAT_XZ, -1, level, None)
case "snappy":
compressor = cramjam.snappy.Compressor()
case "zstd":
compressor = zstd.ZstdCompressor(level=level).compressobj()
case _:
raise InvalidConfigurationError("invalid compression algorithm: {!r}".format(algorithm))
return compressor


class CompressionStream(Stream):
"""Non-seekable stream of data that adds compression on top of given source stream"""

def __init__(self, src_fp: HasRead, algorithm: str, level: int = 0) -> None:
def __init__(self, src_fp: HasRead, algorithm: Algorithm, level: int = 0) -> None:
super().__init__(src_fp, minimum_read_size=32 * 1024)
self._compressor: Compressor
if algorithm == "lzma":
self._compressor = lzma.LZMACompressor(lzma.FORMAT_XZ, -1, level, None)
elif algorithm == "snappy":
self._compressor = snappy.StreamCompressor()
elif algorithm == "zstd":
self._compressor = zstd.ZstdCompressor(level=level).compressobj()
else:
raise InvalidConfigurationError("invalid compression algorithm: {!r}".format(algorithm))
self._compressor = create_streaming_compressor(algorithm, level)

def _process_chunk(self, data: bytes) -> bytes:
return self._compressor.compress(data)
Expand All @@ -63,42 +65,42 @@ def _finalize(self) -> bytes:
return self._compressor.flush()


def DecompressionFile(src_fp: FileLike, algorithm: str) -> FileLike:
def DecompressionFile(src_fp: FileLike, algorithm: Algorithm) -> FileLike:
"""This looks like a class to users, but is actually a function that instantiates a class based on algorithm."""
if algorithm == "lzma":
return lzma.open(cast(IO[bytes], src_fp), "r")

if algorithm == "snappy":
return SnappyFile(src_fp, "rb")

if algorithm == "zstd":
return zstd_open(src_fp, "rb")
match algorithm:
case "lzma":
return lzma.open(cast(IO[bytes], src_fp), "r")
case "snappy":
return SnappyFile(src_fp, "rb")
case "zstd":
return zstd_open(src_fp, "rb")
case _:
raise InvalidConfigurationError("invalid compression algorithm: {!r}".format(algorithm))

if algorithm:
raise InvalidConfigurationError("invalid compression algorithm: {!r}".format(algorithm))

return src_fp
def create_streaming_decompressor(algorithm: Algorithm) -> Decompressor:
match algorithm:
case "lzma":
decompressor = lzma.LZMADecompressor()
case "snappy":
decompressor = cramjam.snappy.Decompressor()
case "zstd":
decompressor = zstd.ZstdDecompressor().decompressobj()
case _:
raise InvalidConfigurationError("invalid compression algorithm: {!r}".format(algorithm))
return decompressor


class DecompressSink(Sink):
def __init__(self, next_sink: HasWrite, compression_algorithm: str):
def __init__(self, next_sink: HasWrite, compression_algorithm: Algorithm):
super().__init__(next_sink)
self.decompressor = self._create_decompressor(compression_algorithm)

def _create_decompressor(self, alg: str) -> Decompressor:
if alg == "snappy":
return snappy.StreamDecompressor()
elif alg == "lzma":
return lzma.LZMADecompressor()
elif alg == "zstd":
return zstd.ZstdDecompressor().decompressobj()
raise InvalidConfigurationError("invalid compression algorithm: {!r}".format(alg))
self.decompressor = create_streaming_decompressor(compression_algorithm)

def write(self, data: BinaryData) -> int:
data = bytes(data) if not isinstance(data, bytes) else data
written = len(data)
if not data:
return written
data = self.decompressor.decompress(data)
self._write_to_next_sink(data)
decompressed_data = self.decompressor.decompress(data)
self._write_to_next_sink(decompressed_data)
return written
1 change: 0 additions & 1 deletion rohmu/filewrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def read(self, size: int = -1) -> bytes:
bytes_available += len(dst_data)
if not src_data:
self._eof = True

if size < 0 or bytes_available < size:
data = b"".join(chunks)
self._remainder = b""
Expand Down
4 changes: 2 additions & 2 deletions rohmu/rohmufile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .encryptor import DecryptorFile, DecryptSink, EncryptorFile
from .errors import InvalidConfigurationError
from .filewrap import ThrottleSink
from .typing import FileLike, HasWrite, Metadata
from .typing import Algorithm, FileLike, HasWrite, Metadata
from contextlib import suppress
from inspect import signature
from rohmu.object_storage.base import IncrementalProgressCallbackType
Expand Down Expand Up @@ -142,7 +142,7 @@ def read_file(
def file_writer(
*,
fileobj: FileLike,
compression_algorithm: Optional[str] = None,
compression_algorithm: Optional[Algorithm] = None,
compression_level: int = 0,
compression_threads: int = 0,
rsa_public_key: Union[None, str, bytes] = None,
Expand Down
39 changes: 17 additions & 22 deletions rohmu/snappyfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@
import io

try:
import snappy
import cramjam
except ImportError:
snappy = None # type: ignore
cramjam = None # type: ignore


class SnappyFile(FileWrap):
def __init__(self, next_fp: FileLike, mode: str) -> None:
if snappy is None:
if cramjam is None:
raise io.UnsupportedOperation("Snappy is not available")

if mode == "rb":
self.decr = snappy.StreamDecompressor()
self.decr = cramjam.snappy.Decompressor()
self.encr = None
elif mode == "wb":
self.decr = None
self.encr = snappy.StreamCompressor()
self.encr = cramjam.snappy.Compressor()
else:
raise io.UnsupportedOperation("unsupported mode for SnappyFile")

Expand All @@ -49,10 +49,11 @@ def write(self, data: BinaryData) -> int: # type: ignore [override]
if self.encr is None:
raise io.UnsupportedOperation("file not open for writing")
data_as_bytes = bytes(data)
compressed_data = self.encr.compress(data_as_bytes)
self.next_fp.write(compressed_data)
self.offset += len(data_as_bytes)
return len(data_as_bytes)
block_size = self.encr.compress(data_as_bytes)
compressed_buffer = self.encr.flush()
self.next_fp.write(compressed_buffer)
self.offset += block_size
return block_size

def writable(self) -> bool:
return self.encr is not None
Expand All @@ -62,19 +63,13 @@ def read(self, size: Optional[int] = -1) -> bytes: # pylint: disable=unused-arg
self._check_not_closed()
if self.decr is None:
raise io.UnsupportedOperation("file not open for reading")
while not self.decr_done:
compressed = self.next_fp.read(IO_BLOCK_SIZE)
if not compressed:
self.decr_done = True
output = self.decr.flush()
else:
output = self.decr.decompress(compressed)

if output:
self.offset += len(output)
return output

return b""
num_decompressed_bytes = 0
while compressed := self.next_fp.read(IO_BLOCK_SIZE):
chunk_size = self.decr.decompress(compressed)
num_decompressed_bytes += chunk_size
self.offset += num_decompressed_bytes
output = self.decr.flush().read()
return output

def readable(self) -> bool:
return self.decr is not None
1 change: 1 addition & 0 deletions rohmu/transfer_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def put(self, transfer: TransferCacheItem) -> None:
_BASE_TRANSFER_INSTANCE_ATTRS = {"config_model", "log", "notifier", "prefix", "stats"}
_BASE_TRANSFER_ATTRS = {attr for attr in vars(BaseTransfer) if not attr.startswith("__")} | _BASE_TRANSFER_INSTANCE_ATTRS


# pylint: disable=abstract-method,super-init-not-called
class SafeTransfer(BaseTransfer[StorageModel]):
"""Helper class that helps the users in finding bugs in their code handling transfers.
Expand Down
4 changes: 3 additions & 1 deletion rohmu/typing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from types import TracebackType
from typing import Any, Dict, Optional, Protocol, Type, TYPE_CHECKING, Union
from typing import Any, Dict, Literal, Optional, Protocol, Type, TYPE_CHECKING, Union

try:
# Remove when dropping support for Python 3.7
Expand Down Expand Up @@ -32,6 +32,8 @@

StrOrPathLike = Union[str, "PathLike[str]"]

Algorithm = Literal["lzma", "snappy", "zstd"]


class HasFileno(Protocol):
def fileno(self) -> int:
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ packages = find:
install_requires =
azure-storage-blob >= 2.1.0
botocore
cramjam >= 2.7.0
cryptography
google-api-python-client
httplib2
oauth2client
paramiko
pydantic
python-dateutil
python-snappy
requests
zstandard
typing_extensions >= 3.10, < 5
Expand Down
Loading

0 comments on commit 7bcbaf7

Please sign in to comment.