From 7d03b30f91fa58540adf3184002542b5158f204f Mon Sep 17 00:00:00 2001 From: Johann Bahl Date: Thu, 7 Mar 2024 18:59:51 +0100 Subject: [PATCH] clean up chunked backend --- src/backy/backends/__init__.py | 4 - src/backy/backends/chunked/__init__.py | 37 +-------- src/backy/backends/chunked/chunk.py | 66 ++++++---------- src/backy/backends/chunked/file.py | 78 +++++++++---------- src/backy/backends/chunked/store.py | 54 +++---------- .../backends/chunked/tests/test_backend.py | 39 ---------- .../backends/chunked/tests/test_chunk.py | 20 +++-- src/backy/backends/chunked/tests/test_file.py | 6 +- 8 files changed, 91 insertions(+), 213 deletions(-) diff --git a/src/backy/backends/__init__.py b/src/backy/backends/__init__.py index e25b66bb..6318eebd 100644 --- a/src/backy/backends/__init__.py +++ b/src/backy/backends/__init__.py @@ -4,7 +4,6 @@ from structlog.stdlib import BoundLogger if TYPE_CHECKING: - from backy.backup import Backup from backy.revision import Revision @@ -26,6 +25,3 @@ def purge(self) -> None: def verify(self) -> None: pass - - def scrub(self, backup: "Backup", type: str) -> int: - return 0 diff --git a/src/backy/backends/chunked/__init__.py b/src/backy/backends/chunked/__init__.py index 595f32dc..9d95ddff 100644 --- a/src/backy/backends/chunked/__init__.py +++ b/src/backy/backends/chunked/__init__.py @@ -7,7 +7,7 @@ from backy.utils import END, report_status from .. import BackyBackend -from .chunk import Chunk +from .chunk import Chunk, Hash from .file import File from .store import Store @@ -54,7 +54,7 @@ def open(self, mode: str = "rb") -> File: # type: ignore[override] def purge(self) -> None: self.log.debug("purge") - used_chunks: Set[str] = set() + used_chunks: Set[Hash] = set() for revision in self.backup.history: try: used_chunks |= set( @@ -90,7 +90,7 @@ def verify(self): if candidate in verified_chunks: continue try: - c = Chunk(f, 0, self.store, candidate) + c = Chunk(self.store, candidate) c._read_existing() except Exception: log.exception("verify-error", chunk=candidate) @@ -124,34 +124,3 @@ def verify(self): yield END yield None - - def scrub(self, backup, type: str) -> int: - if type == "light": - return self.scrub_light(backup) - elif type == "deep": - return self.scrub_deep(backup) - else: - raise RuntimeError("Invalid scrubbing type {}".format(type)) - - def scrub_light(self, backup) -> int: - errors = 0 - self.log.info("scrub-light") - for revision in backup.history: - self.log.info("scrub-light-rev", revision_uuid=revision.uuid) - backend = type(self)(revision, self.log).open() - for hash in backend._mapping.values(): - if backend.store.chunk_path(hash).exists(): - continue - self.log.error( - "scrub-light-missing-chunk", - hash=hash, - revision_uuid=revision.uuid, - ) - errors += 1 - return errors - - def scrub_deep(self, backup) -> int: - errors = self.scrub_light(backup) - self.log.info("scrub-deep") - errors += self.store.validate_chunks() - return errors diff --git a/src/backy/backends/chunked/chunk.py b/src/backy/backends/chunked/chunk.py index f2c677f2..474902c2 100644 --- a/src/backy/backends/chunked/chunk.py +++ b/src/backy/backends/chunked/chunk.py @@ -2,8 +2,7 @@ import io import os import tempfile -import time -from typing import Optional +from typing import Optional, Tuple, TypeAlias import lzo import mmh3 @@ -12,6 +11,8 @@ from backy.backends import BackendException from backy.utils import posix_fadvise +Hash: TypeAlias = str + chunk_stats = { "write_full": 0, "write_partial": 0, @@ -34,38 +35,22 @@ class Chunk(object): CHUNK_SIZE = 4 * 1024**2 # 4 MiB chunks - _read_existing_called = False # Test support - - id: int - hash: str - file: "backy.backends.chunked.File" + hash: Optional[Hash] store: "backy.backends.chunked.Store" clean: bool - loaded: bool data: Optional[io.BytesIO] - def __init__(self, file, id, store, hash): - self.id = id + def __init__( + self, + store: "backy.backends.chunked.Store", + hash: Optional[Hash], + ): self.hash = hash - self.file = file self.store = store self.clean = True - self.loaded = False - - if self.id not in file._access_stats: - self.file._access_stats[id] = (0, 0) - self.data = None - def _cache_prio(self): - return self.file._access_stats[self.id] - - def _touch(self): - count = self.file._access_stats[self.id][0] - self.file._access_stats[self.id] = (count + 1, time.time()) - - def _read_existing(self): - self._read_existing_called = True # Test support + def _read_existing(self) -> None: if self.data is not None: return # Prepare working with the chunk. We keep the data in RAM for @@ -88,17 +73,16 @@ def _read_existing(self): raise InconsistentHash(self.hash, disk_hash) self._init_data(data) - def _init_data(self, data): + def _init_data(self, data: bytes) -> None: self.data = io.BytesIO(data) - def read(self, offset, size=-1): + def read(self, offset: int, size: int = -1) -> Tuple[bytes, int]: """Read data from the chunk. Return the data and the remaining size that should be read. """ self._read_existing() assert self.data is not None - self._touch() self.data.seek(offset) data = self.data.read(size) @@ -107,15 +91,13 @@ def read(self, offset, size=-1): remaining = max([0, size - len(data)]) return data, remaining - def write(self, offset, data): + def write(self, offset: int, data: bytes) -> Tuple[int, bytes]: """Write data to the chunk, returns - the amount of data we used - the _data_ remaining """ - self._touch() - remaining_data = data[self.CHUNK_SIZE - offset :] data = data[: self.CHUNK_SIZE - offset] @@ -133,11 +115,16 @@ def write(self, offset, data): return len(data), remaining_data - def flush(self): + def flush(self) -> Optional[Hash]: + """Writes data to disk if necessary + Returns the new Hash on updates + """ if self.clean: - return + return None assert self.data is not None - self._update_hash() + # I'm not using read() here to a) avoid cache accounting and b) + # use a faster path to get the data. + self.hash = hash(self.data.getvalue()) target = self.store.chunk_path(self.hash) needs_forced_write = ( self.store.force_writes and self.hash not in self.store.seen_forced @@ -158,15 +145,8 @@ def flush(self): self.store.seen_forced.add(self.hash) self.store.known.add(self.hash) self.clean = True - - def _update_hash(self): - # I'm not using read() here to a) avoid cache accounting and b) - # use a faster path to get the data. - assert self.data is not None - data = self.data.getvalue() - self.hash = hash(data) - self.file._mapping[self.id] = self.hash + return self.hash -def hash(data): +def hash(data: bytes) -> Hash: return binascii.hexlify(mmh3.hash_bytes(data)).decode("ascii") diff --git a/src/backy/backends/chunked/file.py b/src/backy/backends/chunked/file.py index 3eecfd1d..f17e37eb 100644 --- a/src/backy/backends/chunked/file.py +++ b/src/backy/backends/chunked/file.py @@ -2,11 +2,13 @@ import json import os import os.path -from typing import Tuple +import time +from collections import defaultdict +from typing import Optional, Tuple import backy.backends.chunked -from .chunk import Chunk +from .chunk import Chunk, Hash class File(object): @@ -38,7 +40,7 @@ class File(object): _position: int _access_stats: dict[int, Tuple[int, float]] # (count, last) - _mapping: dict[int, str] + _mapping: dict[int, Hash] _chunks: dict[int, Chunk] def __init__( @@ -56,7 +58,7 @@ def __init__( self.overlay = overlay self._position = 0 - self._access_stats = {} + self._access_stats = defaultdict(lambda: (0, 0)) self.mode = mode @@ -93,34 +95,30 @@ def __init__( # Chunks that we are working on. self._chunks = {} - def fileno(self): + def fileno(self) -> int: raise OSError( "ChunkedFile does not support use through a file descriptor." ) - def _flush_chunks(self, target=None): - # Support an override to the general flush/cache mechanism to - # allow the final() flush to actually flush everything. + def _flush_chunks(self, target: Optional[int] = None) -> None: target = target if target is not None else self.flush_target - if target == 0: - for chunk in self._chunks.values(): - chunk.flush() - self._chunks = {} - return - elif len(self._chunks) < (2 * target): + if len(self._chunks) < (2 * target): return - chunks = list(self._chunks.values()) - chunks.sort(key=lambda x: x._cache_prio()) - keep_chunks = chunks[-target:] - remove_chunks = chunks[:-target] + chunks = list(self._chunks.items()) + chunks.sort(key=lambda i: self._access_stats[i[0]], reverse=True) + + keep_chunks = chunks[:target] + remove_chunks = chunks[target:] - for chunk in remove_chunks: - chunk.flush() + for id, chunk in remove_chunks: + hash = chunk.flush() + if hash: + self._mapping[id] = hash - self._chunks = {c.id: c for c in keep_chunks} + self._chunks = dict(keep_chunks) - def flush(self): + def flush(self) -> None: assert "w" in self.mode and not self.closed self._flush_chunks(0) @@ -131,29 +129,29 @@ def flush(self): f.flush() os.fsync(f) - def close(self): + def close(self) -> None: assert not self.closed if "w" in self.mode: self.flush() self.closed = True - def isatty(self): + def isatty(self) -> bool: return False - def readable(self): + def readable(self) -> bool: return "r" in self.mode and not self.closed # def readline(size=-1) # def readlines(hint=-1) - def tell(self): + def tell(self) -> int: assert not self.closed return self._position - def seekable(self): + def seekable(self) -> bool: return True - def seek(self, offset, whence=io.SEEK_SET): + def seek(self, offset: int, whence=io.SEEK_SET) -> int: assert not self.closed position = self._position @@ -190,7 +188,7 @@ def seek(self, offset, whence=io.SEEK_SET): self._position = position return position - def truncate(self, size=None): + def truncate(self, size: Optional[int] = None) -> None: assert "w" in self.mode and not self.closed if size is None: size = self._position @@ -204,7 +202,7 @@ def truncate(self, size=None): del self._mapping[key] self.flush() - def read(self, size=-1): + def read(self, size: int = -1) -> bytes: assert "r" in self.mode and not self.closed result = io.BytesIO() max_size = self.size - self._position @@ -213,39 +211,39 @@ def read(self, size=-1): else: size = min([size, max_size]) while size: - chunk, offset = self._current_chunk() + chunk, id, offset = self._current_chunk() data, size = chunk.read(offset, size) if not data: raise ValueError( - "Under-run: chunk {} seems to be missing data".format( - chunk.id - ) + f"Under-run: chunk {id} seems to be missing data" ) self._position += len(data) result.write(data) return result.getvalue() - def writable(self): + def writable(self) -> bool: return "w" in self.mode and not self.closed - def write(self, data): + def write(self, data: bytes) -> None: assert "w" in self.mode and not self.closed while data: - chunk, offset = self._current_chunk() + chunk, _, offset = self._current_chunk() written, data = chunk.write(offset, data) self._position += written if self._position > self.size: self.size = self._position - def _current_chunk(self): + def _current_chunk(self) -> Tuple[Chunk, int, int]: chunk_id = self._position // Chunk.CHUNK_SIZE offset = self._position % Chunk.CHUNK_SIZE if chunk_id not in self._chunks: self._flush_chunks() self._chunks[chunk_id] = Chunk( - self, chunk_id, self.store, self._mapping.get(chunk_id) + self.store, self._mapping.get(chunk_id) ) - return self._chunks[chunk_id], offset + count = self._access_stats[chunk_id][0] + self._access_stats[chunk_id] = (count + 1, time.time()) + return self._chunks[chunk_id], chunk_id, offset def __enter__(self): assert not self.closed diff --git a/src/backy/backends/chunked/store.py b/src/backy/backends/chunked/store.py index a8e4aad2..241fd2a5 100644 --- a/src/backy/backends/chunked/store.py +++ b/src/backy/backends/chunked/store.py @@ -1,12 +1,9 @@ from pathlib import Path -from typing import Set +from typing import Iterable, Set -import lzo from structlog.stdlib import BoundLogger -from backy.utils import END, report_status - -from . import chunk +from backy.backends.chunked.chunk import Hash # A chunkstore, is responsible for all revisions for a single backup, for now. # We can start having statistics later how much reuse between images is @@ -25,14 +22,13 @@ class Store(object): force_writes = False path: Path - seen_forced: set[str] - known: set[str] + seen_forced: set[Hash] + known: set[Hash] log: BoundLogger - def __init__(self, path: Path, log): + def __init__(self, path: Path, log: BoundLogger): self.path = path self.seen_forced = set() - self.known = set() self.log = log.bind(subsystem="chunked-store") for x in range(256): subdir = self.path / f"{x:02x}" @@ -40,12 +36,10 @@ def __init__(self, path: Path, log): if not self.path.joinpath("store").exists(): self.convert_to_v2() - for c in self.path.glob("*/*.chunk.lzo"): - hash = c.name.replace(".chunk.lzo", "") - self.known.add(hash) + self.known = set(self.ls()) self.log.debug("init", known_chunks=len(self.known)) - def convert_to_v2(self): + def convert_to_v2(self) -> None: self.log.info("to-v2") for path in self.path.glob("*/*/*.chunk.lzo"): new = path.relative_to(self.path) @@ -59,38 +53,12 @@ def convert_to_v2(self): f.write(b"v2") self.log.info("to-v2-finished") - @report_status - def validate_chunks(self): - errors = 0 - chunks = list(self.ls()) - yield len(chunks) - for file, file_hash, read in chunks: - try: - data = read(file) - except Exception: - # Typical Exceptions would be IOError, TypeError (lzo) - hash = None - else: - hash = chunk.hash(data) - if file_hash != hash: - errors += 1 - yield ( - "Content mismatch. Expected {} got {}".format( - file_hash, hash - ), - errors, - ) - yield - yield END - yield errors - - def ls(self): + def ls(self) -> Iterable[Hash]: # XXX this is fucking expensive for file in self.path.glob("*/*.chunk.lzo"): - hash = file.name.removesuffix(".chunk.lzo") - yield file, hash, lambda f: lzo.decompress(open(f, "rb").read()) + yield file.name.removesuffix(".chunk.lzo") - def purge(self, used_chunks: Set[str]) -> None: + def purge(self, used_chunks: Set[Hash]) -> None: # This assumes exclusive lock on the store. This is guaranteed by # backy's main locking. to_delete = self.known - used_chunks @@ -99,7 +67,7 @@ def purge(self, used_chunks: Set[str]) -> None: self.chunk_path(file_hash).unlink(missing_ok=True) self.known -= to_delete - def chunk_path(self, hash: str) -> Path: + def chunk_path(self, hash: Hash) -> Path: dir1 = hash[:2] extension = ".chunk.lzo" return self.path.joinpath(dir1).joinpath(hash).with_suffix(extension) diff --git a/src/backy/backends/chunked/tests/test_backend.py b/src/backy/backends/chunked/tests/test_backend.py index c26e78e1..56afc169 100644 --- a/src/backy/backends/chunked/tests/test_backend.py +++ b/src/backy/backends/chunked/tests/test_backend.py @@ -46,42 +46,3 @@ def test_purge(simple_file_config, log): r.remove() r.backend.purge() assert len(list(r.backend.store.ls())) == 0 - - -def test_scrub_light(simple_file_config, log): - b = simple_file_config - r = Revision(b, log) - r.materialize() - b.scan() - f = r.open("w") - f.write(b"asdf") - f.close() - assert r.backend.scrub(b, "light") == 0 - for x, _, _ in r.backend.store.ls(): - os.unlink(x) - assert r.backend.scrub(b, "light") == 1 - - -def test_scrub_deep(simple_file_config, log): - b = simple_file_config - r = Revision(b, log) - r.materialize() - b.scan() - f = r.open("w") - f.write(b"asdf") - f.close() - assert r.backend.scrub(b, "deep") == 0 - for x, _, _ in r.backend.store.ls(): - os.chmod(x, 0o660) - with open(x, "w") as f: - f.write("foobar") - assert r.backend.scrub(b, "deep") == 1 - - -def test_scrub_wrong_type(simple_file_config, log): - b = simple_file_config - r = Revision(b, log) - r.materialize() - b.scan() - with pytest.raises(RuntimeError): - r.backend.scrub(b, "asdf") diff --git a/src/backy/backends/chunked/tests/test_chunk.py b/src/backy/backends/chunked/tests/test_chunk.py index 8c4f3103..0dc84868 100644 --- a/src/backy/backends/chunked/tests/test_chunk.py +++ b/src/backy/backends/chunked/tests/test_chunk.py @@ -1,3 +1,5 @@ +from unittest.mock import Mock + import lzo import pytest @@ -13,7 +15,7 @@ def test_chunk_read_write_update(tmp_path, log): store = Store(tmp_path / "store", log) f = File(tmp_path / "asdf", store) - chunk = Chunk(f, 1, store, None) + chunk = Chunk(store, None) chunk.write(0, b"asdf") chunk.write(4, b"bsdf") assert chunk.read(0) == (b"asdfbsdf", -1) @@ -25,7 +27,7 @@ def test_chunk_write_partial_offset(tmp_path, log): store = Store(tmp_path / "store", log) f = File(tmp_path / "asdf", store) - chunk = Chunk(f, 1, store, None) + chunk = Chunk(store, None) # Write data that fits exactly into this chunk. Nothing remains # to be written. result = chunk.write(0, SPACE_CHUNK) @@ -61,7 +63,7 @@ def test_chunk_read_existing(tmp_path, log): f = File(tmp_path / "asdf", store) - chunk = Chunk(f, 1, store, chunk_hash) + chunk = Chunk(store, chunk_hash) assert chunk.read(0) == (b"asdf", -1) assert chunk.read(0, 10) == (b"asdf", 6) @@ -79,10 +81,11 @@ def test_chunk_write_existing_partial_joins_with_existing_data(tmp_path, log): f = File(tmp_path / "asdf", store) - chunk = Chunk(f, 1, store, chunk_hash) + chunk = Chunk(store, chunk_hash) + chunk._read_existing = Mock(side_effect=chunk._read_existing) chunk.write(2, b"xxsdf") assert chunk.read(0) == (b"asxxsdf", -1) - assert chunk._read_existing_called + chunk._read_existing.assert_called() def test_chunk_fails_wrong_content(tmp_path, log): @@ -95,7 +98,7 @@ def test_chunk_fails_wrong_content(tmp_path, log): f = File(tmp_path / "asdf", store) - chunk = Chunk(f, 1, store, chunk_hash) + chunk = Chunk(store, chunk_hash) with pytest.raises(InconsistentHash): chunk.read(0) @@ -112,7 +115,8 @@ def test_chunk_write_existing_partial_complete_does_not_read_existing_data( f = File(tmp_path / "asdf", store) - chunk = Chunk(f, 1, store, "asdf") + chunk = Chunk(store, "asdf") + chunk._read_existing = Mock(side_effect=chunk._read_existing) chunk.write(0, b"X" * Chunk.CHUNK_SIZE) - assert not chunk._read_existing_called + chunk._read_existing.assert_not_called() assert chunk.read(0, 3) == (b"XXX", 0) diff --git a/src/backy/backends/chunked/tests/test_file.py b/src/backy/backends/chunked/tests/test_file.py index 204bfd38..adcd128e 100644 --- a/src/backy/backends/chunked/tests/test_file.py +++ b/src/backy/backends/chunked/tests/test_file.py @@ -149,7 +149,8 @@ def test_continuously_updated_file(tmp_path, log): chunked_data = f.read() assert data == chunked_data - store.validate_chunks() + for h in store.ls(): + Chunk(store, h)._read_existing() store.purge(set(f._mapping.values())) @@ -189,7 +190,8 @@ def test_seeky_updated_file(tmp_path, log): chunked_data = f.read() assert data == chunked_data - store.validate_chunks() + for h in store.ls(): + Chunk(store, h)._read_existing() store.purge(set(f._mapping.values()))