diff --git a/src/backy/backends/chunked/chunk.py b/src/backy/backends/chunked/chunk.py index 474902c2..7939a336 100644 --- a/src/backy/backends/chunked/chunk.py +++ b/src/backy/backends/chunked/chunk.py @@ -51,7 +51,7 @@ def __init__( self.data = None def _read_existing(self) -> None: - if self.data is not None: + if self.data: return # Prepare working with the chunk. We keep the data in RAM for # easier random access combined with transparent compression. @@ -82,7 +82,7 @@ def read(self, offset: int, size: int = -1) -> Tuple[bytes, int]: Return the data and the remaining size that should be read. """ self._read_existing() - assert self.data is not None + assert self.data self.data.seek(offset) data = self.data.read(size) @@ -107,7 +107,7 @@ def write(self, offset: int, data: bytes) -> Tuple[int, bytes]: chunk_stats["write_full"] += 1 else: self._read_existing() - assert self.data is not None + assert self.data self.data.seek(offset) self.data.write(data) chunk_stats["write_partial"] += 1 @@ -121,7 +121,7 @@ def flush(self) -> Optional[Hash]: """ if self.clean: return None - assert self.data is not None + assert self.data # I'm not using read() here to a) avoid cache accounting and b) # use a faster path to get the data. self.hash = hash(self.data.getvalue()) @@ -129,21 +129,22 @@ def flush(self) -> Optional[Hash]: needs_forced_write = ( self.store.force_writes and self.hash not in self.store.seen_forced ) - if self.hash not in self.store.known or needs_forced_write: - # Create the tempfile in the right directory to increase locality - # of our change - avoid renaming between multiple directories to - # reduce traffic on the directory nodes. - fd, tmpfile_name = tempfile.mkstemp(dir=target.parent) - posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED) # type: ignore - with os.fdopen(fd, mode="wb") as f: - data = lzo.compress(self.data.getvalue()) - f.write(data) - # Micro-optimization: chmod before rename to help against - # metadata flushes and then changing metadata again. - os.chmod(tmpfile_name, 0o440) - os.rename(tmpfile_name, target) - self.store.seen_forced.add(self.hash) - self.store.known.add(self.hash) + if self.hash not in self.store.seen: + if needs_forced_write or not target.exists(): + # Create the tempfile in the right directory to increase locality + # of our change - avoid renaming between multiple directories to + # reduce traffic on the directory nodes. + fd, tmpfile_name = tempfile.mkstemp(dir=target.parent) + posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED) # type: ignore + with os.fdopen(fd, mode="wb") as f: + data = lzo.compress(self.data.getvalue()) + f.write(data) + # Micro-optimization: chmod before rename to help against + # metadata flushes and then changing metadata again. + os.chmod(tmpfile_name, 0o440) + os.rename(tmpfile_name, target) + self.store.seen_forced.add(self.hash) + self.store.seen.add(self.hash) self.clean = True return self.hash diff --git a/src/backy/backends/chunked/store.py b/src/backy/backends/chunked/store.py index 241fd2a5..d60151a0 100644 --- a/src/backy/backends/chunked/store.py +++ b/src/backy/backends/chunked/store.py @@ -23,7 +23,7 @@ class Store(object): path: Path seen_forced: set[Hash] - known: set[Hash] + seen: set[Hash] log: BoundLogger def __init__(self, path: Path, log: BoundLogger): @@ -36,8 +36,7 @@ def __init__(self, path: Path, log: BoundLogger): if not self.path.joinpath("store").exists(): self.convert_to_v2() - self.known = set(self.ls()) - self.log.debug("init", known_chunks=len(self.known)) + self.seen = set() def convert_to_v2(self) -> None: self.log.info("to-v2") @@ -61,11 +60,12 @@ def ls(self) -> Iterable[Hash]: def purge(self, used_chunks: Set[Hash]) -> None: # This assumes exclusive lock on the store. This is guaranteed by # backy's main locking. - to_delete = self.known - used_chunks - self.log.info("purge", purging=len(to_delete)) - for file_hash in sorted(to_delete): + self.log.info("purge") + for file_hash in self.ls(): + if file_hash in used_chunks: + continue self.chunk_path(file_hash).unlink(missing_ok=True) - self.known -= to_delete + self.seen.discard(file_hash) def chunk_path(self, hash: Hash) -> Path: dir1 = hash[:2] diff --git a/src/backy/daemon.py b/src/backy/daemon.py index 40a91882..377f98c5 100644 --- a/src/backy/daemon.py +++ b/src/backy/daemon.py @@ -299,13 +299,13 @@ async def purge_old_files(self): # thread. while True: self.log.info("purge-scanning") - for candidate in self.base_dir.iterdir(): - if not candidate.is_dir() or candidate.is_symlink(): + for candidate in os.scandir(self.base_dir): + if not candidate.is_dir(follow_symlinks=False): continue - self.log.debug("purge-candidate", candidate=candidate) + self.log.debug("purge-candidate", candidate=candidate.path) reference_time = time.time() - 3 * 31 * 24 * 60 * 60 if not has_recent_changes(candidate, reference_time): - self.log.info("purging", candidate=candidate) + self.log.info("purging", candidate=candidate.path) shutil.rmtree(candidate) self.log.info("purge-finished") await asyncio.sleep(24 * 60 * 60) diff --git a/src/backy/sources/ceph/source.py b/src/backy/sources/ceph/source.py index 97a0d4fa..82145138 100644 --- a/src/backy/sources/ceph/source.py +++ b/src/backy/sources/ceph/source.py @@ -90,14 +90,13 @@ def backup(self, target: BackyBackend) -> None: break self.diff(target, parent) - def diff(self, target_: BackyBackend, parent: Revision) -> None: + def diff(self, target: BackyBackend, parent: Revision) -> None: self.log.info("diff") snap_from = "backy-" + parent.uuid snap_to = "backy-" + self.revision.uuid s = self.rbd.export_diff(self._image_name + "@" + snap_to, snap_from) - t = target_.open("r+b", parent) - with s as source, t as target: - bytes = source.integrate(target, snap_from, snap_to) + with s as source, target.open("r+b", parent) as target_: + bytes = source.integrate(target_, snap_from, snap_to) self.log.info("diff-integration-finished") self.revision.stats["bytes_written"] = bytes @@ -107,19 +106,18 @@ def diff(self, target_: BackyBackend, parent: Revision) -> None: self.revision.stats["chunk_stats"] = chunk_stats - def full(self, target_: BackyBackend) -> None: + def full(self, target: BackyBackend) -> None: self.log.info("full") s = self.rbd.export( "{}/{}@backy-{}".format(self.pool, self.image, self.revision.uuid) ) - t = target_.open("r+b") copied = 0 - with s as source, t as target: + with s as source, target.open("r+b") as target_: while True: buf = source.read(4 * backy.utils.MiB) if not buf: break - target.write(buf) + target_.write(buf) copied += len(buf) self.revision.stats["bytes_written"] = copied @@ -128,19 +126,17 @@ def full(self, target_: BackyBackend) -> None: self.revision.stats["chunk_stats"] = chunk_stats - def verify(self, target_: BackyBackend) -> bool: + def verify(self, target: BackyBackend) -> bool: s = self.rbd.image_reader( "{}/{}@backy-{}".format(self.pool, self.image, self.revision.uuid) ) - t = target_.open("rb") - self.revision.stats["ceph-verification"] = "partial" - with s as source, t as target: + with s as source, target.open("rb") as target_: self.log.info("verify") return backy.utils.files_are_roughly_equal( source, - target, + target_, report=lambda s, t, o: self.revision.backup.quarantine.add_report( QuarantineReport(s, t, o) ), diff --git a/src/backy/sources/ceph/tests/test_ceph_source.py b/src/backy/sources/ceph/tests/test_ceph_source.py index 7e7455cb..71c34923 100644 --- a/src/backy/sources/ceph/tests/test_ceph_source.py +++ b/src/backy/sources/ceph/tests/test_ceph_source.py @@ -112,8 +112,8 @@ def test_choose_full_without_parent(ceph_rbd_imagesource, backup, log): revision = Revision.create(backup, set(), log) - with source(revision): - source.backup(revision.backend) + with source(revision) as s: + s.backup(revision.backend) assert not source.diff.called assert source.full.called diff --git a/src/backy/sources/file.py b/src/backy/sources/file.py index ed2a5a59..2312943b 100644 --- a/src/backy/sources/file.py +++ b/src/backy/sources/file.py @@ -41,29 +41,27 @@ def ready(self) -> bool: return False return True - def backup(self, target_: "backy.backends.BackyBackend") -> None: + def backup(self, target: "backy.backends.BackyBackend") -> None: self.log.debug("backup") s = open(self.filename, "rb") parent = self.revision.get_parent() - t = target_.open("r+b", parent) - with s as source, t as target: + with s as source, target.open("r+b", parent) as target_: if self.cow and parent: self.log.info("backup-sparse") - bytes = copy_overwrite(source, target) + bytes = copy_overwrite(source, target_) else: self.log.info("backup-full") - bytes = copy(source, target) + bytes = copy(source, target_) self.revision.stats["bytes_written"] = bytes - def verify(self, target_: "backy.backends.BackyBackend") -> bool: + def verify(self, target: "backy.backends.BackyBackend") -> bool: self.log.info("verify") s = open(self.filename, "rb") - t = target_.open("rb") - with s as source, t as target: + with s as source, target.open("rb") as target_: return files_are_equal( source, - target, + target_, report=lambda s, t, o: self.revision.backup.quarantine.add_report( QuarantineReport(s, t, o) ), diff --git a/src/backy/tests/test_backup.py b/src/backy/tests/test_backup.py index 3943aab3..e2f68b21 100644 --- a/src/backy/tests/test_backup.py +++ b/src/backy/tests/test_backup.py @@ -83,7 +83,7 @@ def test_backup_corrupted(simple_file_config): backup.backup({"daily"}) store = backup.history[0].backend.store - chunk_path = store.chunk_path(next(iter(store.known))) + chunk_path = store.chunk_path(next(iter(store.seen))) os.chmod(chunk_path, 0o664) with open(chunk_path, "wb") as f: f.write(b"invalid") diff --git a/src/backy/utils.py b/src/backy/utils.py index 4b851989..ad72f406 100644 --- a/src/backy/utils.py +++ b/src/backy/utils.py @@ -10,7 +10,7 @@ import tempfile import time import typing -from pathlib import Path +from os import DirEntry from typing import IO, Callable, Iterable, List, Optional, TypeVar from zoneinfo import ZoneInfo @@ -146,7 +146,7 @@ def open_new(self, mode): def open_copy(self, mode): """Open an existing file, make a copy first, rename on close.""" self.open_new("wb") - assert self.f is not None + assert self.f if os.path.exists(self.filename): cp_reflink(self.filename, self.f.name) self.f.close() @@ -176,36 +176,36 @@ def use_write_protection(self): @property def name(self): - assert self.f is not None + assert self.f return self.f.name def read(self, *args, **kw): - assert self.f is not None + assert self.f data = self.f.read(*args, **kw) if self.encoding: data = data.decode(self.encoding) return data def write(self, data): - assert self.f is not None + assert self.f if self.encoding: data = data.encode(self.encoding) self.f.write(data) def seek(self, offset, whence=0): - assert self.f is not None + assert self.f return self.f.seek(offset, whence) def tell(self): - assert self.f is not None + assert self.f return self.f.tell() def truncate(self, size=None): - assert self.f is not None + assert self.f return self.f.truncate(size) def fileno(self): - assert self.f is not None + assert self.f return self.f.fileno() @@ -440,7 +440,7 @@ def min_date(): return datetime.datetime.min.replace(tzinfo=ZoneInfo("UTC")) -def has_recent_changes(entry: Path, reference_time: float): +def has_recent_changes(entry: DirEntry, reference_time: float): # This is not efficient on a first look as we may stat things twice, but it # makes the recursion easier to read and the VFS will be caching this # anyway. @@ -448,17 +448,17 @@ def has_recent_changes(entry: Path, reference_time: float): # higher levels will propagate changed mtimes do to new/deleted files # instead of just modified files in our case and looking at stats when # traversing a directory level is faster than going depth first. - st = entry.stat(follow_symlinks=False) - if st.st_mtime >= reference_time: - return True - if not entry.is_dir() or entry.is_symlink(): + if not entry.is_dir(follow_symlinks=False): return False + if entry.stat(follow_symlinks=False).st_mtime >= reference_time: + return True + candidates = list(os.scandir(entry.path)) # First pass: stat all direct entries - for candidate in entry.iterdir(): + for candidate in candidates: if candidate.stat(follow_symlinks=False).st_mtime >= reference_time: return True # Second pass: start traversing - for candidate in entry.iterdir(): + for candidate in candidates: if has_recent_changes(candidate, reference_time): return True return False