Skip to content

Commit

Permalink
Add OrphanedObjectsState
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Unisikhin committed Nov 3, 2024
1 parent e93101e commit a12d555
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 67 deletions.
37 changes: 12 additions & 25 deletions ch_tools/chadmin/cli/object_storage_group.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import json
from datetime import timedelta
from typing import Optional

from click import Choice, Context, group, option, pass_context
from humanfriendly import format_size

from ch_tools.chadmin.cli.chadmin_group import Chadmin
from ch_tools.chadmin.internal.object_storage.orphaned_objects_state import (
OrphanedObjectsState,
)
from ch_tools.chadmin.internal.zookeeper import (
check_zk_node,
create_zk_nodes,
Expand All @@ -23,10 +25,6 @@
# Use big enough timeout for stream HTTP query
STREAM_TIMEOUT = 10 * 60

# Orphaned objects state fields
ORPHANED_OBJECTS_SIZE_FIELD = "orphaned_objects_size"
ORPHANED_OBJECTS_ERROR_MSG_FIELD = "error_msg"

STATE_LOCAL_PATH = "/tmp/object_storage_cleanup_state.json"


Expand Down Expand Up @@ -161,31 +159,27 @@ def clean_command(
except Exception as e:
error_msg = str(e)

state = OrphanedObjectsState(total_size, error_msg)

if store_state_zk_path:
_store_state_zk_save(ctx, store_state_zk_path, total_size, error_msg)
_store_state_zk_save(ctx, store_state_zk_path, state)

if store_state_local:
_store_state_local_save(ctx, total_size, error_msg)
_store_state_local_save(ctx, state)

_print_response(ctx, dry_run, deleted, total_size)


def _store_state_zk_save(
ctx: Context, path: str, total_size: int, error_msg: str
) -> None:
def _store_state_zk_save(ctx: Context, path: str, state: OrphanedObjectsState) -> None:
if not check_zk_node(ctx, path):
create_zk_nodes(ctx, [path], make_parents=True)
state_data = state.to_json().encode("utf-8")
update_zk_nodes(ctx, [path], state_data)

state_data = json.dumps(
create_orphaned_objects_state(total_size, error_msg), indent=4
)

update_zk_nodes(ctx, [path], state_data.encode("utf-8"))


def _store_state_local_save(_: Context, total_size: int, error_msg: str) -> None:
def _store_state_local_save(_: Context, state: OrphanedObjectsState) -> None:
with open(STATE_LOCAL_PATH, "w", encoding="utf-8") as file:
json.dump(create_orphaned_objects_state(total_size, error_msg), file, indent=4)
file.write(state.to_json())


def _print_response(ctx: Context, dry_run: bool, deleted: int, total_size: int) -> None:
Expand All @@ -211,10 +205,3 @@ def _table_formatter(stats):
print_response(
ctx, clean_stats, default_format="table", table_formatter=_table_formatter
)


def create_orphaned_objects_state(total_size: int, error_msg: str) -> dict:
return {
ORPHANED_OBJECTS_SIZE_FIELD: total_size,
ORPHANED_OBJECTS_ERROR_MSG_FIELD: error_msg,
}
19 changes: 19 additions & 0 deletions ch_tools/chadmin/internal/object_storage/orphaned_objects_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import json
from dataclasses import asdict, dataclass


@dataclass
class OrphanedObjectsState:
orphaned_objects_size: int
error_msg: str

@classmethod
def from_json(cls, json_str: str) -> "OrphanedObjectsState":
data = json.loads(json_str)
return cls(
orphaned_objects_size=data["orphaned_objects_size"],
error_msg=data["error_msg"],
)

def to_json(self) -> str:
return json.dumps(asdict(self), indent=4)
63 changes: 21 additions & 42 deletions ch_tools/monrun_checks/ch_orphaned_objects.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import json
from typing import Tuple

import click

from ch_tools.chadmin.cli.object_storage_group import (
ORPHANED_OBJECTS_ERROR_MSG_FIELD,
ORPHANED_OBJECTS_SIZE_FIELD,
STATE_LOCAL_PATH,
create_orphaned_objects_state,
from ch_tools.chadmin.cli.object_storage_group import STATE_LOCAL_PATH
from ch_tools.chadmin.internal.object_storage.orphaned_objects_state import (
OrphanedObjectsState,
)
from ch_tools.chadmin.internal.zookeeper import get_zk_node
from ch_tools.common.result import CRIT, OK, WARNING, Result
Expand Down Expand Up @@ -51,14 +46,13 @@ def orphaned_objects_command(
) -> Result:
_check_mutually_exclusive(state_local, state_zk_path)

state = _get_orphaned_objects_state(ctx, state_local, state_zk_path)

valid, msg = _orphaned_objects_state_validate(state)
if not valid:
return Result(CRIT, msg)
try:
state = _get_orphaned_objects_state(ctx, state_local, state_zk_path)
except Exception as e:
return Result(CRIT, str(e))

total_size = state[ORPHANED_OBJECTS_SIZE_FIELD]
error_msg = state[ORPHANED_OBJECTS_ERROR_MSG_FIELD]
total_size = state.orphaned_objects_size
error_msg = state.error_msg

if error_msg != "":
return Result(CRIT, error_msg)
Expand All @@ -84,43 +78,28 @@ def _check_mutually_exclusive(state_local, state_zk_path):
)


def _orphaned_objects_state_validate(state: dict) -> Tuple[bool, str]:
total_size = state.get(ORPHANED_OBJECTS_SIZE_FIELD)
error_msg = state.get(ORPHANED_OBJECTS_ERROR_MSG_FIELD)

msg = ""
if total_size is None:
msg += f'Orphaned objects state not have field "{ORPHANED_OBJECTS_SIZE_FIELD}".'
if error_msg is None:
msg += f'Orphaned objects state not have field "{ORPHANED_OBJECTS_ERROR_MSG_FIELD}".'

return msg == "", msg


def _get_orphaned_objects_state(
ctx: click.Context, state_local: bool, state_zk_path: str
) -> dict:
state = dict()

) -> "OrphanedObjectsState":
if state_local:
state = _local_get_orphaned_objects_state()

if state_zk_path:
state = _zk_get_orphaned_objects_state(ctx, state_zk_path)

if state is None:
raise FileNotFoundError()

return state


def _local_get_orphaned_objects_state() -> dict:
try:
with open(STATE_LOCAL_PATH, mode="r", encoding="utf-8") as file:
return json.load(file)
except Exception as e:
return create_orphaned_objects_state(0, str(e))
def _local_get_orphaned_objects_state() -> "OrphanedObjectsState":
with open(STATE_LOCAL_PATH, mode="r", encoding="utf-8") as file:
return OrphanedObjectsState.from_json(file.read())


def _zk_get_orphaned_objects_state(ctx: click.Context, state_zk_path: str) -> dict:
try:
return json.loads(get_zk_node(ctx, state_zk_path))
except Exception as e:
return create_orphaned_objects_state(0, str(e))
def _zk_get_orphaned_objects_state(
ctx: click.Context, state_zk_path: str
) -> "OrphanedObjectsState":
zk_data = get_zk_node(ctx, state_zk_path)
return OrphanedObjectsState.from_json(zk_data)

0 comments on commit a12d555

Please sign in to comment.