Skip to content

Commit

Permalink
Merge pull request #37 from flyingcircusio/issue-30-server-migration
Browse files Browse the repository at this point in the history
expire revisions on remote servers
  • Loading branch information
ctheune authored Apr 11, 2024
2 parents bed3db5 + 6ccad75 commit 752c76f
Show file tree
Hide file tree
Showing 30 changed files with 3,072 additions and 925 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ repos:
- types-PyYAML==5.4.0
- types-setuptools
- types-tzlocal==4.2
- types-aiofiles==23.2.0.20240311
exclude: tests
args:
- --check-untyped-defs
Expand Down
9 changes: 9 additions & 0 deletions changelog.d/20230626_005856_jb_issue_30_server_migration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- Support backup job migration across servers

- Add `tags {set, add, remove}` subcommand

- Add `expire` subcommand

- logging: improve exception formatting

- logging: add taskid
3 changes: 3 additions & 0 deletions changelog.d/20240205_012140_jb_issue_30_server_migration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.. A new scriv changelog fragment.
- Add `push` and `pull` subcommand
3 changes: 3 additions & 0 deletions changelog.d/20240205_012340_jb_issue_30_server_migration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.. A new scriv changelog fragment.
- Add `server:` selector to revision spec
3 changes: 3 additions & 0 deletions changelog.d/20240402_125207_jb_issue_30_server_migration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.. A new scriv changelog fragment.
- Coordinate backups for the same job between servers
7 changes: 7 additions & 0 deletions doc/man-backy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ Subcommand-specific options
Trust state. Ordered by date, oldest first.
* A tag with the **tag:** prefix. Selects all revisions with this tag.
Ordered by date, oldest first.
* A server with the **server:** prefix: Selects all revisions located on
this server. The current server can be selected with an empty string.
Ordered by date, oldest first.
* The key word **local** selects all revisions located on the current
server (`server:`).
* The key word **local** selects all revisions located on remote servers
(`not(server:)`).
* An inclusive range using two single revision specifiers separated with two
dots. The singe revision specifiers may be omitted, in which case the
**first** and/or **last** revision is assumed.
Expand Down
15 changes: 12 additions & 3 deletions lib.nix
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ let
scriv = super.scriv.overrideAttrs (old: {
buildInputs = (old.buildInputs or []) ++ [ super.setuptools ];
});
backports-tarfile = super.backports-tarfile.overrideAttrs (old: {
buildInputs = (old.buildInputs or []) ++ [ super.setuptools ];
});
docutils = super.docutils.overrideAttrs (old: {
buildInputs = (old.buildInputs or []) ++ [ super.flit-core ];
});
execnet = super.execnet.overrideAttrs (old: {
buildInputs = (old.buildInputs or []) ++ [ super.hatchling super.hatch-vcs ];
});
Expand All @@ -48,10 +54,13 @@ let
# replace poetry to avoid dependency on vulnerable python-cryptography package
nativeBuildInputs = [ super.poetry-core ] ++ builtins.filter (p: p.pname or "" != "poetry") old.nativeBuildInputs;
});
aiofiles = super.aiofiles.overrideAttrs (old: {
buildInputs = (old.buildInputs or []) ++ [ super.hatchling super.hatch-vcs ];
});
nh3 =
let
getCargoHash = version: {
"0.2.15" = "sha256-fetAE3cj9hh4SoPE72Bqco5ytUMiDqbazeS2MHdUibM=";
"0.2.17" = "sha256-WomlVzKOUfcgAWGJInSvZn9hm+bFpgc4nJbRiyPCU64=";
}.${version} or (
lib.warn "Unknown nh3 version: '${version}'. Please update getCargoHash." lib.fakeHash
);
Expand All @@ -75,7 +84,7 @@ let
cryptography =
let
getCargoHash = version: {
"41.0.7" = "sha256-VeZhKisCPDRvmSjGNwCgJJeVj65BZ0Ge+yvXbZw86Rw";
"42.0.5" = "sha256-Pw3ftpcDMfZr/w6US5fnnyPVsFSB9+BuIKazDocYjTU=";
}.${version} or (
lib.warn "Unknown cryptography version: '${version}'. Please update getCargoHash." lib.fakeHash
);
Expand Down Expand Up @@ -118,7 +127,7 @@ in

devShells = {
default = mkShellNoCC {
BACKY_CMD = "backy";
BACKY_CMD = "${poetryEnv}/bin/backy";
packages = [
poetryEnv
poetry
Expand Down
1,243 changes: 696 additions & 547 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ aiohttp = "^3.8.4"
rich = "^13.3.2"
yarl = "1.9.2"
frozenlist = "1.4.0"
aiofiles = "^23.2.1"
aioshutil = "^1.3"

[tool.poetry.dev-dependencies]
pre-commit = "^3.3.3"
Expand Down
138 changes: 123 additions & 15 deletions src/backy/api.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
import datetime
import re
from json import JSONEncoder
from typing import Any, List, Tuple
from pathlib import Path
from typing import TYPE_CHECKING, Any, List, Tuple

from aiohttp import hdrs, web
from aiohttp.web_exceptions import HTTPAccepted, HTTPNotFound, HTTPUnauthorized
from aiohttp.web_exceptions import (
HTTPAccepted,
HTTPBadRequest,
HTTPForbidden,
HTTPNotFound,
HTTPPreconditionFailed,
HTTPServiceUnavailable,
HTTPUnauthorized,
)
from aiohttp.web_middlewares import middleware
from aiohttp.web_runner import AppRunner, TCPSite
from structlog.stdlib import BoundLogger

import backy.daemon
from backy.backup import Backup
from backy.revision import Revision
from backy.scheduler import Job
from backy.utils import generate_taskid

if TYPE_CHECKING:
from backy.daemon import BackyDaemon


class BackyJSONEncoder(JSONEncoder):
Expand All @@ -23,14 +38,14 @@ def default(self, o: Any) -> Any:


class BackyAPI:
daemon: "backy.daemon.BackyDaemon"
daemon: "BackyDaemon"
sites: dict[Tuple[str, int], TCPSite]
runner: AppRunner
tokens: dict
log: BoundLogger

def __init__(self, daemon, log):
self.log = log.bind(subsystem="api")
self.log = log.bind(subsystem="api", job_name="~")
self.daemon = daemon
self.sites = {}
self.app = web.Application(
Expand All @@ -42,6 +57,14 @@ def __init__(self, daemon, log):
web.post("/v1/reload", self.reload_daemon),
web.get("/v1/jobs", self.get_jobs),
web.post("/v1/jobs/{job_name}/run", self.run_job),
web.get("/v1/backups", self.list_backups),
web.post("/v1/backups/{backup_name}/purge", self.run_purge),
web.post("/v1/backups/{backup_name}/touch", self.touch_backup),
web.get("/v1/backups/{backup_name}/revs", self.get_revs),
web.put(
"/v1/backups/{backup_name}/revs/{rev_spec}/tags",
self.put_tags,
),
]
)

Expand All @@ -67,7 +90,7 @@ async def reconfigure(
)
await site.start()
self.log.info("added-site", site=site.name)
for bind_addr, site in self.sites.items():
for bind_addr, site in list(self.sites.items()):
if bind_addr in bind_addrs:
continue
await site.stop()
Expand All @@ -77,9 +100,12 @@ async def reconfigure(
@middleware
async def log_conn(self, request: web.Request, handler):
request["log"] = self.log.bind(
path=request.path, query=request.query_string
sub_taskid=request.headers.get("taskid"),
taskid=generate_taskid(),
)
request["log"].debug(
"new-conn", path=request.path, query=request.query_string
)
request["log"].debug("new-conn")
try:
resp = await handler(request)
except Exception as e:
Expand Down Expand Up @@ -107,8 +133,7 @@ async def require_auth(self, request: web.Request, handler):
request["log"].info("auth-token-unknown")
raise HTTPUnauthorized()
request["client"] = client
request["log"] = request["log"].bind(client=client)
request["log"].debug("auth-passed")
request["log"] = request["log"].bind(job_name="~" + client)
return await handler(request)

@middleware
Expand All @@ -121,26 +146,109 @@ async def to_json(self, request: web.Request, handler):
else:
return web.json_response(resp, dumps=BackyJSONEncoder().encode)

async def get_status(self, request: web.Request):
filter = request.query.get("filter", "")
async def get_status(
self, request: web.Request
) -> List["BackyDaemon.StatusDict"]:
filter = request.query.get("filter", None)
request["log"].info("get-status", filter=filter)
if filter:
filter = re.compile(filter)
return self.daemon.status(filter)

async def reload_daemon(self, request: web.Request):
request["log"].info("reload-daemon")
self.daemon.reload()

async def get_jobs(self, request: web.Request):
async def get_jobs(self, request: web.Request) -> List[Job]:
request["log"].info("get-jobs")
return list(self.daemon.jobs.values())

async def get_job(self, request: web.Request):
async def get_job(self, request: web.Request) -> Job:
name = request.match_info.get("job_name")
request["log"].info("get-job", name=name)
try:
name = request.match_info.get("job_name", None)
return self.daemon.jobs[name]
except KeyError:
request["log"].info("get-job-not-found", name=name)
raise HTTPNotFound()

async def run_job(self, request: web.Request):
j = await self.get_job(request)
request["log"].info("run-job", name=j.name)
j.run_immediately.set()
raise HTTPAccepted()

async def list_backups(self, request: web.Request) -> List[str]:
request["log"].info("list-backups")
return list(self.daemon.dead_backups.keys())

async def get_backup(
self, request: web.Request, allow_active: bool
) -> Backup:
name = request.match_info.get("backup_name")
request["log"].info("get-backups", name=name)
if name in self.daemon.dead_backups:
return self.daemon.dead_backups[name]
if name in self.daemon.jobs:
if allow_active:
return self.daemon.jobs[name].backup
request["log"].info("get-backups-forbidden", name=name)
raise HTTPForbidden()
request["log"].info("get-backups-not-found", name=name)
raise HTTPNotFound()

async def run_purge(self, request: web.Request):
backup = await self.get_backup(request, False)
request["log"].info("run-purge", name=backup.name)
backup.set_purge_pending()
raise HTTPAccepted()

async def touch_backup(self, request: web.Request):
backup = await self.get_backup(request, True)
request["log"].info("touch-backup", name=backup.name)
backup.touch()

async def get_revs(self, request: web.Request) -> List[Revision]:
backup = await self.get_backup(request, True)
request["log"].info("get-revs", name=backup.name)
backup.scan()
return backup.get_history(
local=True, clean=request.query.get("only_clean", "") == "1"
)

async def put_tags(self, request: web.Request):
json = await request.json()
try:
old_tags = set(json["old_tags"])
new_tags = set(json["new_tags"])
except KeyError:
request["log"].info("put-tags-bad-request")
raise HTTPBadRequest()
autoremove = request.query.get("autoremove", "") == "1"
spec = request.match_info.get("rev_spec")
backup = await self.get_backup(request, False)
request["log"].info(
"put-tags",
name=backup.name,
old_tags=old_tags,
new_tags=new_tags,
spec=spec,
autoremove=autoremove,
)
backup.scan()
try:
if not backup.tags(
"set",
spec,
new_tags,
old_tags,
autoremove=autoremove,
force=True,
):
raise HTTPPreconditionFailed()
except KeyError:
request["log"].info("put-tags-rev-not-found")
raise HTTPNotFound()
except BlockingIOError:
request["log"].info("put-tags-locked")
raise HTTPServiceUnavailable()
4 changes: 2 additions & 2 deletions src/backy/backends/chunked/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def open(self, mode: str = "rb", parent: Optional[Revision] = None) -> File: #
def purge(self) -> None:
self.log.debug("purge")
used_chunks: Set[Hash] = set()
for revision in self.backup.history:
for revision in self.backup.local_history:
if revision.backend_type != "chunked":
continue
used_chunks.update(
Expand All @@ -65,7 +65,7 @@ def verify(self):
verified_chunks: Set[Hash] = set()

# Load verified chunks to avoid duplicate work
for revision in self.backup.clean_history:
for revision in self.backup.get_history(clean=True, local=True):
if (
revision.trust != Trust.VERIFIED
or revision.backend_type != "chunked"
Expand Down
3 changes: 3 additions & 0 deletions src/backy/backends/chunked/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def test_purge(simple_file_config, log):
f.write(b"asdf")
f.close()
r.materialize()
remote = Revision(b, log) # remote revision without local data
remote.server = "remote"
remote.materialize()
b.scan()
# Reassign as the scan will create a new reference
r = b.history[0]
Expand Down
Loading

0 comments on commit 752c76f

Please sign in to comment.