Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compressor: replace snappy with cramjam #134

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ jobs:

- id: dependencies
run: |
sudo apt-get install -y libsnappy-dev
pip install --upgrade -r requirements.txt
pip install --upgrade -r requirements.dev.txt

- id: pylint
run: make lint

Expand All @@ -59,7 +57,7 @@ jobs:
strategy:
max-parallel: 4
matrix:
python-version: ["3.11", "3.10", "3.9", "3.8"]
python-version: ["3.11", "3.10"]

steps:
- id: checkout-code
Expand All @@ -72,7 +70,6 @@ jobs:

- id: dependencies
run: |
sudo apt-get install -y libsnappy-dev
pip install -r requirements.txt
pip install -r requirements.dev.txt

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-pypi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.8"
python-version: "3.11"

- name: Ensure tags are properly synced
run: git fetch --tags --force
Expand Down
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[MAIN]
extension-pkg-allow-list=pydantic
extension-pkg-allow-list=cramjam,pydantic

[MESSAGES CONTROL]
disable=
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Features
Requirements
============

Rohmu requires Python >= 3.8. For Python library dependencies, have a
Rohmu requires Python >= 3.10. For Python library dependencies, have a
look at
`requirements.txt <https://github.com/aiven/rohmu/blob/main/requirements.txt>`__.

Expand Down
4 changes: 2 additions & 2 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ ignore_missing_imports = True
[mypy-azure.common.*]
ignore_missing_imports = True

[mypy-oauth2client.*]
[mypy-cramjam.*]
ignore_missing_imports = True

[mypy-snappy.*]
[mypy-oauth2client.*]
ignore_missing_imports = True

[mypy-swiftclient.*]
Expand Down
3 changes: 2 additions & 1 deletion rohmu.spec
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ Requires: python3-cryptography >= 1.6
Requires: python3-dateutil
Requires: python3-pydantic
Requires: python3-requests
Requires: python3-snappy
# Requires: python3-snappy
# TODO: Create python3-cramjam
Requires: python3-zstandard
BuildRequires: python3-devel
BuildRequires: python3-flake8
Expand Down
139 changes: 87 additions & 52 deletions rohmu/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,86 @@
from .errors import InvalidConfigurationError
from .filewrap import Sink, Stream
from .snappyfile import SnappyFile
from .typing import BinaryData, Compressor, Decompressor, FileLike, HasRead, HasWrite
from .typing import BinaryData, CompressionAlgorithm, Compressor, Decompressor, FileLike, HasRead, HasWrite
from .zstdfile import open as zstd_open
from typing import cast, IO

import lzma

try:
import snappy
import cramjam

# Cramjam streaming classes are lazy and diverge from Compressor and Decompressor interfaces.
# Adapt the parent classes to flush and return the inner buffer after compress and decompress calls.
class CramjamStreamingCompressor(Compressor):
def __init__(self) -> None:
self._compressor = cramjam.snappy.Compressor()

def compress(self, data: bytes) -> bytes:
self._compressor.compress(data)
return self.flush()

def flush(self) -> bytes:
buf = self._compressor.flush()
return buf.read()

class CramjamStreamingDecompressor(Decompressor):
def __init__(self) -> None:
self._decompressor = cramjam.snappy.Decompressor()

def decompress(self, data: bytes) -> bytes:
self._decompressor.decompress(data)
buf = self._decompressor.flush()
return buf.read()

except ImportError:
snappy = None # type: ignore
cramjam = None # type: ignore
CramjamStreamingCompressor: Compressor | None = None # type: ignore[no-redef]
CramjamStreamingDecompressor: Decompressor | None = None # type: ignore[no-redef]

try:
import zstandard as zstd
except ImportError:
zstd = None # type: ignore


def CompressionFile(dst_fp: FileLike, algorithm: str, level: int = 0, threads: int = 0) -> FileLike:
def CompressionFile(dst_fp: FileLike, algorithm: CompressionAlgorithm, level: int = 0, threads: int = 0) -> FileLike:
"""This looks like a class to users, but is actually a function that instantiates a class based on algorithm."""
if algorithm == "lzma":
return lzma.open(cast(IO[bytes], dst_fp), "w", preset=level)

if algorithm == "snappy":
return SnappyFile(dst_fp, "wb")

if algorithm == "zstd":
return zstd_open(dst_fp, "wb", level=level, threads=threads)

if algorithm:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")

return dst_fp
compression_fileobj: FileLike
match algorithm:
case "lzma":
compression_fileobj = lzma.open(cast(IO[bytes], dst_fp), "w", preset=level)
case "snappy":
compression_fileobj = SnappyFile(dst_fp, "wb")
case "zstd":
compression_fileobj = zstd_open(dst_fp, "wb", level=level, threads=threads)
case _:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")
return compression_fileobj


def create_streaming_compressor(algorithm: CompressionAlgorithm, level: int = 0) -> Compressor:
compressor: Compressor
match algorithm:
case "lzma":
compressor = lzma.LZMACompressor(lzma.FORMAT_XZ, -1, level, None)
case "snappy":
if CramjamStreamingCompressor is None:
raise ImportError("Unable to import cramjam")
compressor = CramjamStreamingCompressor()
case "zstd":
compressor = zstd.ZstdCompressor(level=level).compressobj()
case _:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")
return compressor


class CompressionStream(Stream):
"""Non-seekable stream of data that adds compression on top of given source stream"""

def __init__(self, src_fp: HasRead, algorithm: str, level: int = 0) -> None:
def __init__(self, src_fp: HasRead, algorithm: CompressionAlgorithm, level: int = 0) -> None:
super().__init__(src_fp, minimum_read_size=32 * 1024)
self._compressor: Compressor
if algorithm == "lzma":
self._compressor = lzma.LZMACompressor(lzma.FORMAT_XZ, -1, level, None)
elif algorithm == "snappy":
self._compressor = snappy.StreamCompressor()
elif algorithm == "zstd":
self._compressor = zstd.ZstdCompressor(level=level).compressobj()
else:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")
self._compressor = create_streaming_compressor(algorithm, level)

def _process_chunk(self, data: bytes) -> bytes:
return self._compressor.compress(data)
Expand All @@ -63,42 +95,45 @@ def _finalize(self) -> bytes:
return self._compressor.flush()


def DecompressionFile(src_fp: FileLike, algorithm: str) -> FileLike:
def DecompressionFile(src_fp: FileLike, algorithm: CompressionAlgorithm) -> FileLike:
"""This looks like a class to users, but is actually a function that instantiates a class based on algorithm."""
if algorithm == "lzma":
return lzma.open(cast(IO[bytes], src_fp), "r")

if algorithm == "snappy":
return SnappyFile(src_fp, "rb")

if algorithm == "zstd":
return zstd_open(src_fp, "rb")
match algorithm:
case "lzma":
return lzma.open(cast(IO[bytes], src_fp), "r")
case "snappy":
return SnappyFile(src_fp, "rb")
case "zstd":
return zstd_open(src_fp, "rb")
case _:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")

if algorithm:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")

return src_fp
def create_streaming_decompressor(algorithm: CompressionAlgorithm) -> Decompressor:
decompressor: Decompressor
match algorithm:
case "lzma":
decompressor = lzma.LZMADecompressor()
case "snappy":
if CramjamStreamingDecompressor is None:
raise ImportError("Unable to import cramjam")
decompressor = CramjamStreamingDecompressor()
case "zstd":
decompressor = zstd.ZstdDecompressor().decompressobj()
case _:
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(algorithm)}")
return decompressor


class DecompressSink(Sink):
def __init__(self, next_sink: HasWrite, compression_algorithm: str):
def __init__(self, next_sink: HasWrite, compression_algorithm: CompressionAlgorithm):
super().__init__(next_sink)
self.decompressor = self._create_decompressor(compression_algorithm)

def _create_decompressor(self, alg: str) -> Decompressor:
if alg == "snappy":
return snappy.StreamDecompressor()
elif alg == "lzma":
return lzma.LZMADecompressor()
elif alg == "zstd":
return zstd.ZstdDecompressor().decompressobj()
raise InvalidConfigurationError(f"invalid compression algorithm: {repr(alg)}")
self.decompressor = create_streaming_decompressor(compression_algorithm)

def write(self, data: BinaryData) -> int:
data = bytes(data) if not isinstance(data, bytes) else data
written = len(data)
if not data:
return written
data = self.decompressor.decompress(data)
self._write_to_next_sink(data)
decompressed_data = self.decompressor.decompress(data)
self._write_to_next_sink(decompressed_data)
return written
1 change: 0 additions & 1 deletion rohmu/filewrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def read(self, size: int = -1) -> bytes:
bytes_available += len(dst_data)
if not src_data:
self._eof = True

if size < 0 or bytes_available < size:
data = b"".join(chunks)
self._remainder = b""
Expand Down
6 changes: 3 additions & 3 deletions rohmu/rohmufile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .encryptor import DecryptorFile, DecryptSink, EncryptorFile
from .errors import InvalidConfigurationError
from .filewrap import ThrottleSink
from .typing import FileLike, HasWrite, Metadata
from .typing import CompressionAlgorithm, FileLike, HasWrite, Metadata
from contextlib import suppress
from inspect import signature
from rohmu.object_storage.base import IncrementalProgressCallbackType
Expand Down Expand Up @@ -143,7 +143,7 @@ def read_file(
def file_writer(
*,
fileobj: FileLike,
compression_algorithm: Optional[str] = None,
compression_algorithm: Optional[CompressionAlgorithm] = None,
compression_level: int = 0,
compression_threads: int = 0,
rsa_public_key: Union[None, str, bytes] = None,
Expand All @@ -162,7 +162,7 @@ def write_file(
input_obj: FileLike,
output_obj: FileLike,
progress_callback: IncrementalProgressCallbackType = None,
compression_algorithm: Optional[str] = None,
compression_algorithm: Optional[CompressionAlgorithm] = None,
compression_level: int = 0,
compression_threads: int = 0,
rsa_public_key: Union[None, str, bytes] = None,
Expand Down
39 changes: 17 additions & 22 deletions rohmu/snappyfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@
import io

try:
import snappy
import cramjam
except ImportError:
snappy = None # type: ignore
cramjam = None # type: ignore


class SnappyFile(FileWrap):
def __init__(self, next_fp: FileLike, mode: str) -> None:
if snappy is None:
if cramjam is None:
raise io.UnsupportedOperation("Snappy is not available")

if mode == "rb":
self.decr = snappy.StreamDecompressor()
self.decr = cramjam.snappy.Decompressor()
self.encr = None
elif mode == "wb":
self.decr = None
self.encr = snappy.StreamCompressor()
self.encr = cramjam.snappy.Compressor()
else:
raise io.UnsupportedOperation("unsupported mode for SnappyFile")

Expand All @@ -49,10 +49,11 @@ def write(self, data: BinaryData) -> int: # type: ignore [override]
if self.encr is None:
raise io.UnsupportedOperation("file not open for writing")
data_as_bytes = bytes(data)
compressed_data = self.encr.compress(data_as_bytes)
self.next_fp.write(compressed_data)
self.offset += len(data_as_bytes)
return len(data_as_bytes)
block_size = self.encr.compress(data_as_bytes)
compressed_buffer = self.encr.flush()
self.next_fp.write(compressed_buffer)
self.offset += block_size
return block_size

def writable(self) -> bool:
return self.encr is not None
Expand All @@ -62,19 +63,13 @@ def read(self, size: Optional[int] = -1) -> bytes: # pylint: disable=unused-arg
self._check_not_closed()
if self.decr is None:
raise io.UnsupportedOperation("file not open for reading")
while not self.decr_done:
compressed = self.next_fp.read(IO_BLOCK_SIZE)
if not compressed:
self.decr_done = True
output = self.decr.flush()
else:
output = self.decr.decompress(compressed)

if output:
self.offset += len(output)
return output

return b""
num_decompressed_bytes = 0
while compressed := self.next_fp.read(IO_BLOCK_SIZE):
chunk_size = self.decr.decompress(compressed)
num_decompressed_bytes += chunk_size
self.offset += num_decompressed_bytes
output = self.decr.flush().read()
return output

def readable(self) -> bool:
return self.decr is not None
4 changes: 3 additions & 1 deletion rohmu/typing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from types import TracebackType
from typing import Any, Dict, Optional, Protocol, Type, TYPE_CHECKING, Union
from typing import Any, Dict, Literal, Optional, Protocol, Type, TYPE_CHECKING, Union

try:
# Remove when dropping support for Python 3.7
Expand Down Expand Up @@ -32,6 +32,8 @@

StrOrPathLike = Union[str, "PathLike[str]"]

CompressionAlgorithm = Literal["lzma", "snappy", "zstd"]


class HasFileno(Protocol):
def fileno(self) -> int:
Expand Down
Loading