Skip to content

Commit

Permalink
Fix type hints issue from Rohmu 1.2.0
Browse files Browse the repository at this point in the history
Signed-off-by: Samuel Giffard <samuel.giffard@aiven.io>
  • Loading branch information
Samuel Giffard committed Oct 11, 2023
1 parent c6d3db4 commit 4b33ca4
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mypy: version
.PHONY: fmt
fmt: version
unify --quote '"' --recursive --in-place $(PYTHON_SOURCE_DIRS)
isort --recursive $(PYTHON_SOURCE_DIRS)
isort $(PYTHON_SOURCE_DIRS)
yapf --parallel --recursive --in-place $(PYTHON_SOURCE_DIRS)

.PHONY: coverage
Expand Down
8 changes: 4 additions & 4 deletions pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
from pghoard.basebackup.chunks import ChunkUploader, DeltaStats
from pghoard.basebackup.delta import DeltaBaseBackup
from pghoard.common import (
BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, CompressionData, EncryptionData, FileType, NoException,
PGHoardThread, connection_string_using_pgpass, download_backup_meta_file, extract_pghoard_bb_v2_metadata,
replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking,
TAR_METADATA_FILENAME, BackupFailure, BaseBackupFormat, BaseBackupMode, CallbackEvent, CompressionData, EncryptionData,
FileType, NoException, PGHoardThread, connection_string_using_pgpass, download_backup_meta_file,
extract_pghoard_bb_v2_metadata, replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking,
set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess
)
from pghoard.compressor import CompressionEvent
Expand Down Expand Up @@ -397,7 +397,7 @@ def run_basic_basebackup(self):
def get_control_entries_for_tar(self, *, metadata, pg_control, backup_label):
mtime = time.time()
blob = io.BytesIO(common.json_encode(metadata, binary=True))
ti = tarfile.TarInfo(name=".pghoard_tar_metadata.json")
ti = tarfile.TarInfo(name=TAR_METADATA_FILENAME)
ti.size = len(blob.getbuffer())
ti.mtime = mtime
yield ti, blob, False
Expand Down
13 changes: 7 additions & 6 deletions pghoard/basebackup/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
from queue import Empty
from tempfile import NamedTemporaryFile
from types import TracebackType
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from typing import (Any, Callable, Dict, List, Optional, Tuple, Type, Union, cast)

from rohmu import rohmufile
from rohmu.delta.common import EMBEDDED_FILE_SIZE

# pylint: disable=superfluous-parens
from pghoard.common import (
BackupFailure, BaseBackupFormat, CallbackEvent, CallbackQueue, CompressionData, EncryptionData, FileType,
FileTypePrefixes, NoException
BackupFailure, BaseBackupFormat, CallbackEvent, CallbackQueue, CompressionData, EncryptionData, FileLikeWithName,
FileType, FileTypePrefixes, NoException
)
from pghoard.metrics import Metrics
from pghoard.transfer import TransferQueue, UploadEvent
Expand Down Expand Up @@ -144,22 +144,23 @@ def tar_one_file(
start_time = time.monotonic()

with NamedTemporaryFile(dir=temp_dir, prefix=os.path.basename(chunk_path), suffix=".tmp") as raw_output_obj:
raw_output_file = cast(FileLikeWithName, raw_output_obj)
# pylint: disable=bad-continuation
with rohmufile.file_writer(
compression_algorithm=self.compression_data.algorithm,
compression_level=self.compression_data.level,
compression_threads=self.site_config["basebackup_compression_threads"],
rsa_public_key=self.encryption_data.rsa_public_key,
fileobj=raw_output_obj
fileobj=raw_output_file
) as output_obj:
with tarfile.TarFile(fileobj=output_obj, mode="w") as output_tar:
self.write_files_to_tar(files=files_to_backup, tar=output_tar, delta_stats=delta_stats)

input_size = output_obj.tell()

result_size = raw_output_obj.tell()
result_size = raw_output_file.tell()
# Make the file persist over the with-block with this hardlink
os.link(raw_output_obj.name, chunk_path)
os.link(raw_output_file.name, chunk_path)

rohmufile.log_compression_result(
encrypted=bool(self.encryption_data.encryption_key_id),
Expand Down
32 changes: 21 additions & 11 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,19 @@
from pathlib import Path
from queue import Empty
from tempfile import NamedTemporaryFile
from typing import AbstractSet, Any, Callable, Dict, Iterable, List, Set, Tuple
from typing import (AbstractSet, Any, Callable, Dict, Iterable, List, Protocol, Set, Tuple, cast)

from rohmu import BaseTransfer, rohmufile
from rohmu.dates import now
from rohmu.delta.common import (
BackupManifest, BackupPath, SizeLimitedFile, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult
)
from rohmu.delta.common import (BackupManifest, BackupPath, SnapshotFile, SnapshotHash, SnapshotResult, SnapshotUploadResult)
from rohmu.delta.snapshot import Snapshotter
from rohmu.errors import FileNotFoundFromStorageError
from rohmu.typing import HasRead, HasSeek

from pghoard.basebackup.chunks import ChunkUploader
from pghoard.common import (
BackupFailure, BaseBackupFormat, CallbackQueue, CompressionData, EncryptionData, FileType, FileTypePrefixes,
download_backup_meta_file, extract_pghoard_delta_metadata
BackupFailure, BaseBackupFormat, CallbackQueue, CompressionData, EncryptionData, FileLikeWithName, FileType,
FileTypePrefixes, download_backup_meta_file, extract_pghoard_delta_metadata
)
from pghoard.metrics import Metrics
from pghoard.transfer import TransferQueue, UploadEvent
Expand All @@ -40,6 +39,10 @@ class UploadedFilesMetric:
count: int = 0


class HasReadAndSeek(HasRead, HasSeek, Protocol):
...


FilesChunk = Set[Tuple]
SnapshotFiles = Dict[str, SnapshotFile]

Expand Down Expand Up @@ -114,7 +117,7 @@ def _list_existing_files(self) -> SnapshotFiles:
return all_snapshot_files

def _delta_upload_hexdigest(
self, *, temp_dir: Path, chunk_path: Path, file_obj: SizeLimitedFile, callback_queue: CallbackQueue,
self, *, temp_dir: Path, chunk_path: Path, file_obj: HasReadAndSeek, callback_queue: CallbackQueue,
relative_path: Path
) -> Tuple[int, int, str, bool]:
"""Schedule a separate delta file for the upload, calculates the final hash to use it as a name
Expand All @@ -131,18 +134,19 @@ def progress_callback(n_bytes: int = 1) -> None:
self.metrics.increase("pghoard.basebackup_bytes_uploaded", inc_value=n_bytes, tags={"delta": True})

with NamedTemporaryFile(dir=temp_dir, prefix=os.path.basename(chunk_path), suffix=".tmp") as raw_output_obj:
raw_output_file = cast(FileLikeWithName, raw_output_obj)
rohmufile.write_file(
input_obj=file_obj,
output_obj=raw_output_obj,
output_obj=raw_output_file,
compression_algorithm=self.compression_data.algorithm,
compression_level=self.compression_data.level,
rsa_public_key=self.encryption_data.rsa_public_key,
log_func=self.log.info,
data_callback=result_hash.update,
progress_callback=progress_callback,
)
result_size = raw_output_obj.tell()
raw_output_obj.seek(0)
result_size = raw_output_file.tell()
raw_output_file.seek(0)

result_digest = result_hash.hexdigest()

Expand All @@ -157,7 +161,7 @@ def progress_callback(n_bytes: int = 1) -> None:
else:
self.submitted_hashes.add(result_digest)

os.link(raw_output_obj.name, chunk_path)
os.link(raw_output_file.name, chunk_path)

rohmufile.log_compression_result(
encrypted=bool(self.encryption_data.encryption_key_id),
Expand Down Expand Up @@ -333,6 +337,9 @@ def _split_files_for_upload(
current_chunk_size: int = 0
current_chunk: FilesChunk = set()

if snapshot_result.state is None:
raise BackupFailure("Snapshot result state is None")

for snapshot_file in snapshot_result.state.files:
if not snapshot_file.should_be_bundled:
if snapshot_file.hexdigest:
Expand Down Expand Up @@ -378,6 +385,9 @@ def _read_delta_sizes(self, snapshot_result: SnapshotResult) -> Tuple[UploadedFi
digests_metric = UploadedFilesMetric()
embed_metric = UploadedFilesMetric()

if snapshot_result.state is None:
raise BackupFailure("Snapshot result state is None")

for snapshot_file in snapshot_result.state.files:
# Sizes of files uploaded as chunks are calculated separately
if snapshot_file.should_be_bundled:
Expand Down
33 changes: 23 additions & 10 deletions pghoard/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@
from pathlib import Path
from queue import Queue
from threading import Thread
from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Optional, Tuple)
from typing import (TYPE_CHECKING, Any, BinaryIO, Callable, Dict, Final, Optional, Protocol, Tuple, cast)

from rohmu import IO_BLOCK_SIZE, BaseTransfer, rohmufile
from rohmu.errors import Error, InvalidConfigurationError
from rohmu.typing import FileLike, HasName

from pghoard import pgutil

TAR_METADATA_FILENAME: Final[str] = ".pghoard_tar_metadata.json"

LOG = logging.getLogger("pghoard.common")


class FileLikeWithName(FileLike, HasName, Protocol):
...


class StrEnum(str, enum.Enum):
def __str__(self):
return str(self.value)
Expand Down Expand Up @@ -306,24 +313,30 @@ def delete_alert_file(config, filename):
os.unlink(filepath)


def _extract_metadata(fileobj):
def _extract_metadata(fileobj: BinaryIO) -> Dict[str, Any]:
# | in mode to use tarfile's internal stream buffer manager, currently required because our SnappyFile
# interface doesn't do proper buffering for reads
with tarfile.open(fileobj=fileobj, mode="r|", bufsize=IO_BLOCK_SIZE) as tar:
for tarinfo in tar:
if tarinfo.name == ".pghoard_tar_metadata.json":
tar_meta_bytes = tar.extractfile(tarinfo).read()
if tarinfo.name == TAR_METADATA_FILENAME:
tar_extracted = tar.extractfile(tarinfo)
if tar_extracted is None:
raise Exception(
f"{TAR_METADATA_FILENAME} is not a regular file, there is no data associated to it. "
"Is it a directory?"
)
tar_meta_bytes = tar_extracted.read()
return json.loads(tar_meta_bytes.decode("utf-8"))

raise Exception(".pghoard_tar_metadata.json not found")
raise Exception(f"{TAR_METADATA_FILENAME} not found")


def extract_pghoard_bb_v2_metadata(fileobj):
return _extract_metadata(fileobj)
def extract_pghoard_bb_v2_metadata(fileobj: FileLike) -> Dict[str, Any]:
return _extract_metadata(cast(BinaryIO, fileobj))


def extract_pghoard_delta_metadata(fileobj):
return _extract_metadata(fileobj)
def extract_pghoard_delta_metadata(fileobj: FileLike) -> Dict[str, Any]:
return _extract_metadata(cast(BinaryIO, fileobj))


def get_pg_wal_directory(config):
Expand Down Expand Up @@ -423,7 +436,7 @@ def from_config(config) -> "CompressionData":

def download_backup_meta_file(
storage: BaseTransfer, basebackup_path: str, metadata: Dict[str, Any], key_lookup: Callable[[str], str],
extract_meta_func: Callable[[BinaryIO], Dict[str, Any]]
extract_meta_func: Callable[[FileLike], Dict[str, Any]]
) -> Tuple[Dict[str, Any], bytes]:
bmeta_compressed = storage.get_contents_to_string(basebackup_path)[0]
with rohmufile.file_reader(fileobj=io.BytesIO(bmeta_compressed), metadata=metadata, key_lookup=key_lookup) as input_obj:
Expand Down
5 changes: 3 additions & 2 deletions pghoard/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
from rohmu.errors import (Error, InvalidConfigurationError, MaybeRecoverableError)

from pghoard.common import (
BaseBackupFormat, FileType, FileTypePrefixes, StrEnum, download_backup_meta_file, extract_pghoard_delta_metadata
TAR_METADATA_FILENAME, BaseBackupFormat, FileType, FileTypePrefixes, StrEnum, download_backup_meta_file,
extract_pghoard_delta_metadata
)
from pghoard.object_store import (HTTPRestore, ObjectStore, print_basebackup_list)

Expand Down Expand Up @@ -932,7 +933,7 @@ def _build_tar_args(self, metadata):
if not file_format:
return base_args
elif file_format in {BaseBackupFormat.v1, BaseBackupFormat.v2, BaseBackupFormat.delta_v1, BaseBackupFormat.delta_v2}:
extra_args = ["--exclude", ".pghoard_tar_metadata.json", "--transform", "s,^pgdata/,,"]
extra_args = ["--exclude", TAR_METADATA_FILENAME, "--transform", "s,^pgdata/,,"]
if file_format in {BaseBackupFormat.delta_v1, BaseBackupFormat.delta_v2}:
extra_args += ["--exclude", ".manifest.json"]
if self.tablespaces:
Expand Down
3 changes: 2 additions & 1 deletion pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from rohmu import get_transfer
from rohmu.errors import FileNotFoundFromStorageError
from rohmu.object_storage.base import (BaseTransfer, IncrementalProgressCallbackType)
from rohmu.typing import Metadata

from pghoard.common import (
CallbackEvent, CallbackQueue, FileType, PGHoardThread, Queue, QuitEvent, StrEnum, create_alert_file,
Expand Down Expand Up @@ -50,7 +51,7 @@ class BaseTransferEvent:
@dataclass(frozen=True)
class UploadEvent(BaseTransferEvent):
source_data: Union[BinaryIO, Path]
metadata: Dict[str, str]
metadata: Metadata
file_size: Optional[int]
remove_after_upload: bool = True
retry_number: int = 0
Expand Down
14 changes: 7 additions & 7 deletions test/basebackup/test_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pathlib import Path
from queue import Empty
from test.base import CONSTANT_TEST_RSA_PUBLIC_KEY
from typing import Any, Callable, Dict, Generator, Tuple
from typing import Any, Callable, Dict, Generator, Tuple, cast
from unittest.mock import MagicMock, Mock

import mock
Expand Down Expand Up @@ -419,18 +419,18 @@ def test_upload_single_delta_files_cleanup_after_error(
) -> None:
_, file_hash = delta_file

with patch.object(deltabasebackup, "_delta_upload_hexdigest") as mock_delta_upload_hexdigest:
with patch.object(deltabasebackup, "_delta_upload_hexdigest") as mock_delta_upload_hexdigest, \
patch.object(snapshotter, "update_snapshot_file_data", side_effect=Exception):
mock_delta_upload_hexdigest.return_value = (200, 10, file_hash, True)
snapshotter.update_snapshot_file_data = Mock(side_effect=Exception)

if not key_exists:
deltabasebackup.storage.delete_key.side_effect = FileNotFoundFromStorageError
cast(Mock, deltabasebackup.storage.delete_key).side_effect = FileNotFoundFromStorageError

with snapshotter.lock:
deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access
with pytest.raises(BackupFailure):
deltabasebackup._upload_single_delta_files(todo_hexdigests={file_hash}, snapshotter=snapshotter, progress=0) # pylint: disable=protected-access
deltabasebackup.storage.delete_key.assert_called_with(f"abc/basebackup_delta/{file_hash}")
cast(Mock, deltabasebackup.storage.delete_key).assert_called_with(f"abc/basebackup_delta/{file_hash}")


@pytest.mark.parametrize("files_count, initial_progress", [(1, 0), (4, 0), (10, 0), (1, 90), (15, 10)])
Expand All @@ -442,9 +442,9 @@ def test_upload_single_delta_files_progress(
delta_hashes = {file_hash for _, file_hash in delta_files}

with patch.object(deltabasebackup, "_delta_upload_hexdigest") as mock_delta_upload_hexdigest, \
patch.object(deltabasebackup, "metrics") as mock_metrics:
patch.object(deltabasebackup, "metrics") as mock_metrics, \
patch.object(snapshotter, "update_snapshot_file_data"):
mock_delta_upload_hexdigest.side_effect = [(200, 10, file_hash, True) for file_hash in delta_hashes]
snapshotter.update_snapshot_file_data = Mock()
with snapshotter.lock:
deltabasebackup._snapshot(snapshotter=snapshotter) # pylint: disable=protected-access
deltabasebackup._upload_single_delta_files( # pylint: disable=protected-access
Expand Down
8 changes: 4 additions & 4 deletions test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
from rohmu.errors import Error

from pghoard.common import (
create_pgpass_file, default_json_serialization, download_backup_meta_file, extract_pg_command_version_string,
extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode, pg_major_version,
pg_version_string_to_number, write_json_file
TAR_METADATA_FILENAME, create_pgpass_file, default_json_serialization, download_backup_meta_file,
extract_pg_command_version_string, extract_pghoard_bb_v2_metadata, extract_pghoard_delta_metadata, json_encode,
pg_major_version, pg_version_string_to_number, write_json_file
)

from .base import PGHoardTestCase
Expand Down Expand Up @@ -221,7 +221,7 @@ def convert_pg_command_version_to_number(command_version_string):
]
)
def test_download_backup_meta(metadata, extract_meta_func):
data = dict_to_tar_data(data=metadata, tar_name=".pghoard_tar_metadata.json")
data = dict_to_tar_data(data=metadata, tar_name=TAR_METADATA_FILENAME)
storage = Mock()
storage.get_contents_to_string.return_value = (data, {})
backup_meta, backup_compressed_data = download_backup_meta_file(
Expand Down
6 changes: 4 additions & 2 deletions test/test_pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import pytest

import pghoard.pghoard as pghoard_module
from pghoard.common import (BaseBackupFormat, FileType, create_alert_file, delete_alert_file, write_json_file)
from pghoard.common import (
TAR_METADATA_FILENAME, BaseBackupFormat, FileType, create_alert_file, delete_alert_file, write_json_file
)
from pghoard.pghoard import PGHoard
from pghoard.pgutil import create_connection_string

Expand Down Expand Up @@ -466,7 +468,7 @@ def write_backup_files(what):
}
}
}
input_size = dict_to_tar_file(data=metadata, file_path=bb_path, tar_name=".pghoard_tar_metadata.json")
input_size = dict_to_tar_file(data=metadata, file_path=bb_path, tar_name=TAR_METADATA_FILENAME)

for h in hexdigests:
with open(Path(basebackup_delta_path) / h, "w") as digest_file, \
Expand Down
4 changes: 2 additions & 2 deletions test/test_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import pytest

from pghoard.common import write_json_file
from pghoard.common import TAR_METADATA_FILENAME, write_json_file
from pghoard.restore import (
MAX_RETRIES, BasebackupFetcher, ChunkFetcher, FileDataInfo, FileInfoType, FilePathInfo, Restore, RestoreError,
create_recovery_conf
Expand Down Expand Up @@ -541,7 +541,7 @@ def test_restore_get_delta_basebackup_data():
}
}

data = dict_to_tar_data(metadata, tar_name=".pghoard_tar_metadata.json")
data = dict_to_tar_data(metadata, tar_name=TAR_METADATA_FILENAME)

r = Restore()
r.config = {
Expand Down
Loading

0 comments on commit 4b33ca4

Please sign in to comment.