Skip to content

Commit

Permalink
compressor: replace snappy with cramjam
Browse files Browse the repository at this point in the history
The python-snappy library is no longer maintained. Cramjam is almost a
drop-in replacement that allows for a considerable speedup both in the
compression and decompression tasks. The relevant benchmark for
snappy's framed format [1].

Add more tests for compression and decompression & modernize some pattern matching.

[1]: https://github.com/milesgranger/pyrus-cramjam/blob/33c0516374fb9726ddcb82c5dfbe86be96d2cd35/cramjam-python/benchmarks/README.md
  • Loading branch information
aris-aiven committed Aug 24, 2023
1 parent 77471a4 commit e6b789e
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 96 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ jobs:

- id: dependencies
run: |
sudo apt-get install -y libsnappy-dev
pip install -r requirements.txt
pip install -r requirements.dev.txt
Expand Down
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[MAIN]
extension-pkg-allow-list=pydantic
extension-pkg-allow-list=cramjam,pydantic

[MESSAGES CONTROL]
disable=
Expand Down
4 changes: 2 additions & 2 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ ignore_missing_imports = True
[mypy-azure.common.*]
ignore_missing_imports = True

[mypy-oauth2client.*]
[mypy-cramjam.*]
ignore_missing_imports = True

[mypy-snappy.*]
[mypy-oauth2client.*]
ignore_missing_imports = True

[mypy-swiftclient.*]
Expand Down
3 changes: 2 additions & 1 deletion rohmu.spec
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ Requires: python3-cryptography >= 1.6
Requires: python3-dateutil
Requires: python3-pydantic
Requires: python3-requests
Requires: python3-snappy
# Requires: python3-snappy
# TODO: Create python3-cramjam
Requires: python3-zstandard
BuildRequires: python3-devel
BuildRequires: python3-flake8
Expand Down
140 changes: 88 additions & 52 deletions rohmu/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,87 @@
from .errors import InvalidConfigurationError
from .filewrap import Sink, Stream
from .snappyfile import SnappyFile
from .typing import BinaryData, Compressor, Decompressor, FileLike, HasRead, HasWrite
from .typing import BinaryData, CompressionAlgorithm, Compressor, Decompressor, FileLike, HasRead, HasWrite
from .zstdfile import open as zstd_open
from typing import cast, IO

import lzma


try:
import snappy
import cramjam

# Cramjam streaming classes are lazy and diverge from Compressor and Decompressor interfaces.
# Adapt the parent classes to flush and return the inner buffer after compress and decompress calls.
class CramjamStreamingCompressor(Compressor):
def __init__(self) -> None:
self._compressor = cramjam.snappy.Compressor()

def compress(self, data: bytes) -> bytes:
self._compressor.compress(data)
return self.flush()

def flush(self) -> bytes:
buf = self._compressor.flush()
return buf.read()

class CramjamStreamingDecompressor(Decompressor):
def __init__(self) -> None:
self._decompressor = cramjam.snappy.Decompressor()

def decompress(self, data: bytes) -> bytes:
self._decompressor.decompress(data)
buf = self._decompressor.flush()
return buf.read()

except ImportError:
snappy = None # type: ignore
cramjam = None # type: ignore
CramjamStreamingCompressor: Compressor | None = None # type: ignore[no-redef]
CramjamStreamingDecompressor: Decompressor | None = None # type: ignore[no-redef]

try:
import zstandard as zstd
except ImportError:
zstd = None # type: ignore


def CompressionFile(dst_fp: FileLike, algorithm: str, level: int = 0, threads: int = 0) -> FileLike:
def CompressionFile(dst_fp: FileLike, algorithm: CompressionAlgorithm, level: int = 0, threads: int = 0) -> FileLike:
"""This looks like a class to users, but is actually a function that instantiates a class based on algorithm."""
if algorithm == "lzma":
return lzma.open(cast(IO[bytes], dst_fp), "w", preset=level)

if algorithm == "snappy":
return SnappyFile(dst_fp, "wb")

if algorithm == "zstd":
return zstd_open(dst_fp, "wb", level=level, threads=threads)

if algorithm:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")

return dst_fp
compression_fileobj: FileLike
match algorithm:
case "lzma":
compression_fileobj = lzma.open(cast(IO[bytes], dst_fp), "w", preset=level)
case "snappy":
compression_fileobj = SnappyFile(dst_fp, "wb")
case "zstd":
compression_fileobj = zstd_open(dst_fp, "wb", level=level, threads=threads)
case _:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")
return compression_fileobj


def create_streaming_compressor(algorithm: CompressionAlgorithm, level: int = 0) -> Compressor:
compressor: Compressor
match algorithm:
case "lzma":
compressor = lzma.LZMACompressor(lzma.FORMAT_XZ, -1, level, None)
case "snappy":
if CramjamStreamingCompressor is None:
raise ImportError("Unable to import cramjam")
compressor = CramjamStreamingCompressor()
case "zstd":
compressor = zstd.ZstdCompressor(level=level).compressobj()
case _:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")
return compressor


class CompressionStream(Stream):
"""Non-seekable stream of data that adds compression on top of given source stream"""

def __init__(self, src_fp: HasRead, algorithm: str, level: int = 0) -> None:
def __init__(self, src_fp: HasRead, algorithm: CompressionAlgorithm, level: int = 0) -> None:
super().__init__(src_fp, minimum_read_size=32 * 1024)
self._compressor: Compressor
if algorithm == "lzma":
self._compressor = lzma.LZMACompressor(lzma.FORMAT_XZ, -1, level, None)
elif algorithm == "snappy":
self._compressor = snappy.StreamCompressor()
elif algorithm == "zstd":
self._compressor = zstd.ZstdCompressor(level=level).compressobj()
else:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")
self._compressor = create_streaming_compressor(algorithm, level)

def _process_chunk(self, data: bytes) -> bytes:
return self._compressor.compress(data)
Expand All @@ -63,42 +96,45 @@ def _finalize(self) -> bytes:
return self._compressor.flush()


def DecompressionFile(src_fp: FileLike, algorithm: str) -> FileLike:
def DecompressionFile(src_fp: FileLike, algorithm: CompressionAlgorithm) -> FileLike:
"""This looks like a class to users, but is actually a function that instantiates a class based on algorithm."""
if algorithm == "lzma":
return lzma.open(cast(IO[bytes], src_fp), "r")

if algorithm == "snappy":
return SnappyFile(src_fp, "rb")

if algorithm == "zstd":
return zstd_open(src_fp, "rb")
match algorithm:
case "lzma":
return lzma.open(cast(IO[bytes], src_fp), "r")
case "snappy":
return SnappyFile(src_fp, "rb")
case "zstd":
return zstd_open(src_fp, "rb")
case _:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")

if algorithm:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")

return src_fp
def create_streaming_decompressor(algorithm: CompressionAlgorithm) -> Decompressor:
decompressor: Decompressor
match algorithm:
case "lzma":
decompressor = lzma.LZMADecompressor()
case "snappy":
if CramjamStreamingDecompressor is None:
raise ImportError("Unable to import cramjam")
decompressor = CramjamStreamingDecompressor()
case "zstd":
decompressor = zstd.ZstdDecompressor().decompressobj()
case _:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")
return decompressor


class DecompressSink(Sink):
def __init__(self, next_sink: HasWrite, compression_algorithm: str):
def __init__(self, next_sink: HasWrite, compression_algorithm: CompressionAlgorithm):
super().__init__(next_sink)
self.decompressor = self._create_decompressor(compression_algorithm)

def _create_decompressor(self, alg: str) -> Decompressor:
if alg == "snappy":
return snappy.StreamDecompressor()
elif alg == "lzma":
return lzma.LZMADecompressor()
elif alg == "zstd":
return zstd.ZstdDecompressor().decompressobj()
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(alg)}")
self.decompressor = create_streaming_decompressor(compression_algorithm)

def write(self, data: BinaryData) -> int:
data = bytes(data) if not isinstance(data, bytes) else data
written = len(data)
if not data:
return written
data = self.decompressor.decompress(data)
self._write_to_next_sink(data)
decompressed_data = self.decompressor.decompress(data)
self._write_to_next_sink(decompressed_data)
return written
1 change: 0 additions & 1 deletion rohmu/filewrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def read(self, size: int = -1) -> bytes:
bytes_available += len(dst_data)
if not src_data:
self._eof = True

if size < 0 or bytes_available < size:
data = b"".join(chunks)
self._remainder = b""
Expand Down
6 changes: 3 additions & 3 deletions rohmu/rohmufile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .encryptor import DecryptorFile, DecryptSink, EncryptorFile
from .errors import InvalidConfigurationError
from .filewrap import ThrottleSink
from .typing import FileLike, HasWrite, Metadata
from .typing import CompressionAlgorithm, FileLike, HasWrite, Metadata
from contextlib import suppress
from inspect import signature
from rohmu.object_storage.base import IncrementalProgressCallbackType
Expand Down Expand Up @@ -143,7 +143,7 @@ def read_file(
def file_writer(
*,
fileobj: FileLike,
compression_algorithm: Optional[str] = None,
compression_algorithm: Optional[CompressionAlgorithm] = None,
compression_level: int = 0,
compression_threads: int = 0,
rsa_public_key: Union[None, str, bytes] = None,
Expand All @@ -162,7 +162,7 @@ def write_file(
input_obj: FileLike,
output_obj: FileLike,
progress_callback: IncrementalProgressCallbackType = None,
compression_algorithm: Optional[str] = None,
compression_algorithm: Optional[CompressionAlgorithm] = None,
compression_level: int = 0,
compression_threads: int = 0,
rsa_public_key: Union[None, str, bytes] = None,
Expand Down
39 changes: 17 additions & 22 deletions rohmu/snappyfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@
import io

try:
import snappy
import cramjam
except ImportError:
snappy = None # type: ignore
cramjam = None # type: ignore


class SnappyFile(FileWrap):
def __init__(self, next_fp: FileLike, mode: str) -> None:
if snappy is None:
if cramjam is None:
raise io.UnsupportedOperation("Snappy is not available")

if mode == "rb":
self.decr = snappy.StreamDecompressor()
self.decr = cramjam.snappy.Decompressor()
self.encr = None
elif mode == "wb":
self.decr = None
self.encr = snappy.StreamCompressor()
self.encr = cramjam.snappy.Compressor()
else:
raise io.UnsupportedOperation("unsupported mode for SnappyFile")

Expand All @@ -49,10 +49,11 @@ def write(self, data: BinaryData) -> int: # type: ignore [override]
if self.encr is None:
raise io.UnsupportedOperation("file not open for writing")
data_as_bytes = bytes(data)
compressed_data = self.encr.compress(data_as_bytes)
self.next_fp.write(compressed_data)
self.offset += len(data_as_bytes)
return len(data_as_bytes)
block_size = self.encr.compress(data_as_bytes)
compressed_buffer = self.encr.flush()
self.next_fp.write(compressed_buffer)
self.offset += block_size
return block_size

def writable(self) -> bool:
return self.encr is not None
Expand All @@ -62,19 +63,13 @@ def read(self, size: Optional[int] = -1) -> bytes: # pylint: disable=unused-arg
self._check_not_closed()
if self.decr is None:
raise io.UnsupportedOperation("file not open for reading")
while not self.decr_done:
compressed = self.next_fp.read(IO_BLOCK_SIZE)
if not compressed:
self.decr_done = True
output = self.decr.flush()
else:
output = self.decr.decompress(compressed)

if output:
self.offset += len(output)
return output

return b""
num_decompressed_bytes = 0
while compressed := self.next_fp.read(IO_BLOCK_SIZE):
chunk_size = self.decr.decompress(compressed)
num_decompressed_bytes += chunk_size
self.offset += num_decompressed_bytes
output = self.decr.flush().read()
return output

def readable(self) -> bool:
return self.decr is not None
4 changes: 3 additions & 1 deletion rohmu/typing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from types import TracebackType
from typing import Any, Dict, Optional, Protocol, Type, TYPE_CHECKING, Union
from typing import Any, Dict, Literal, Optional, Protocol, Type, TYPE_CHECKING, Union

try:
# Remove when dropping support for Python 3.7
Expand Down Expand Up @@ -32,6 +32,8 @@

StrOrPathLike = Union[str, "PathLike[str]"]

CompressionAlgorithm = Literal["lzma", "snappy", "zstd"]


class HasFileno(Protocol):
def fileno(self) -> int:
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ packages = find:
install_requires =
azure-storage-blob >= 2.1.0
botocore
cramjam >= 2.7.0
cryptography
google-api-python-client
httplib2
oauth2client
paramiko
pydantic < 2
python-dateutil
python-snappy
requests
zstandard
typing_extensions >= 3.10, < 5
Expand Down
Loading

0 comments on commit e6b789e

Please sign in to comment.