Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move zookeeper.py in chadmin #70

Merged
merged 4 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ch_tools/chadmin/cli/zookeeper_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ch_tools.chadmin.internal.table_replica import get_table_replica
from ch_tools.chadmin.internal.zookeeper import (
check_zk_node,
clean_zk_nodes,
create_zk_nodes,
delete_zk_nodes,
get_zk_node,
Expand Down Expand Up @@ -266,3 +267,20 @@ def delete_ddl_task_command(ctx, tasks):
"""
paths = [f"/clickhouse/task_queue/ddl/{task}" for task in tasks]
delete_zk_nodes(ctx, paths)


@zookeeper_group.command(name="clickhouse-hosts-cleanup")
Alex-Burmak marked this conversation as resolved.
Show resolved Hide resolved
@option("-f", "--fqdn", type=str, help="Removed FQDNs, comma separated", required=True)
Alex-Burmak marked this conversation as resolved.
Show resolved Hide resolved
@option(
"-c",
"--root",
"zk_root_path",
type=str,
help="Cluster ZooKeeper root path. If not specified,the root path will be used.",
required=False,
default="/",
)
Alex-Burmak marked this conversation as resolved.
Show resolved Hide resolved
@pass_context
def clickhouse_hosts_command(ctx, fqdn, zk_root_path):
hosts = fqdn.split(",")
clean_zk_nodes(ctx, zk_root_path, hosts)
159 changes: 147 additions & 12 deletions ch_tools/chadmin/internal/zookeeper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import os
import re
from contextlib import contextmanager
from queue import Queue

from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kazoo.exceptions import NoNodeError, NotEmptyError

from ch_tools.chadmin.cli import get_config, get_macros

Expand All @@ -27,19 +29,22 @@ def get_zk_node_acls(ctx, path):
return zk.get_acls(path)


def _get_children(zk, path):
try:
return zk.get_children(path)
except NoNodeError:
return [] # in the case ZK deletes a znode while we traverse the tree


def list_zk_nodes(ctx, path, verbose=False):
def _stat_node(zk, node):
descendants_count = 0
queue = [node]
while queue:
item = queue.pop()
try:
children = zk.get_children(item)
descendants_count += len(children)
queue.extend(os.path.join(item, node) for node in children)
except NoNodeError:
# ZooKeeper nodes can be deleted during node tree traversal
pass
children = _get_children(zk, item)
descendants_count += len(children)
queue.extend(os.path.join(item, node) for node in children)

return {
"path": node,
Expand Down Expand Up @@ -83,11 +88,15 @@ def delete_zk_node(ctx, path):


def delete_zk_nodes(ctx, paths):
paths_formated = [_format_path(ctx, path) for path in paths]
with zk_client(ctx) as zk:
for path in paths:
path = _format_path(ctx, path)
print(f"Deleting ZooKeeper node {path}")
zk.delete(path, recursive=True)
_delete_zk_nodes(zk, paths_formated)


def _delete_zk_nodes(zk, paths):
for path in paths:
print(f"Deleting ZooKeeper node {path}")
zk.delete(path, recursive=True)


def _format_path(ctx, path):
Expand All @@ -98,6 +107,132 @@ def _format_path(ctx, path):
return path.format_map(get_macros(ctx))


def _set_node_value(zk, path, value):
"""
Set value to node in zk.
"""
if zk.exists(path):
try:
zk.set(path, value.encode())
except NoNodeError:
print(f"Can not set for node: {path} value : {value}")


def _find_paths(zk, root_path, included_paths_regexp, excluded_paths_regexp=None):
"""
Traverse zookeeper tree from root_path with bfs approach.

Return paths of nodes that match the include regular expression and do not match the excluded one.
"""
paths = set()
queue: Queue = Queue()
queue.put(root_path)
included_regexp = re.compile("|".join(included_paths_regexp))
excluded_regexp = (
re.compile("|".join(excluded_paths_regexp)) if excluded_paths_regexp else None
)

while not queue.empty():
path = queue.get()
if excluded_regexp and re.match(excluded_regexp, path):
continue

for child_node in _get_children(zk, path):
subpath = os.path.join(path, child_node)

if re.match(included_regexp, subpath):
paths.add(subpath)
else:
queue.put(os.path.join(path, subpath))

return list(paths)


def clean_zk_nodes(ctx, zk_root_path, nodes):
"""
Perform cleanup in zookeeper after deleting hosts in the cluster or whole cluster deleting.
"""

def _try_delete_zk_node(zk, node):
try:
_delete_zk_nodes(zk, [node])
except NoNodeError:
# Someone deleted node before us. Do nothing.
print("Node {node} is already absent, skipped".format(node=node))
except NotEmptyError:
# Someone created child node while deleting.
# I'm not sure that we can get this exception with recursive=True.
# Do nothing.
print("Node {node} is not empty, skipped".format(node=node))

def _find_parents(zk, root_path, nodes, parent_name):
excluded_paths = [
".*clickhouse/task_queue",
".*clickhouse/zero_copy",
]
included_paths = [".*/" + parent_name + "/" + node for node in nodes]
paths = _find_paths(zk, root_path, included_paths, excluded_paths)
return [os.sep.join(path.split(os.sep)[:-2]) for path in paths]

def _set_replicas_is_lost(zk, table_paths, nodes):
"""
Set flag <path>/replicas/<replica_name>/is_lost to 1
"""
for path in table_paths:

replica_path = os.path.join(path, "replicas")
if not zk.exists(replica_path):
continue

for node in nodes:
is_lost_flag_path = os.path.join(replica_path, node, "is_lost")
print("Set is_lost_flag " + is_lost_flag_path)
_set_node_value(zk, is_lost_flag_path, "1")

def _remove_replicas_queues(zk, paths, replica_names):
"""
Remove <path>/replicas/<replica_name>/queue
"""
for path in paths:
replica_name = os.path.join(path, "replicas")
if not zk.exists(replica_name):
continue

for replica_name in replica_names:
queue_path = os.path.join(replica_name, replica_name, "queue")
if not zk.exists(queue_path):
continue

if zk.exists(queue_path):
_try_delete_zk_node(zk, queue_path)

def _nodes_absent(zk, zk_root_path, nodes):
included_paths = [".*/" + node for node in nodes]
paths_to_delete = _find_paths(zk, zk_root_path, included_paths)
for node in paths_to_delete:
_try_delete_zk_node(zk, node)

def _absent_if_empty_child(zk, path, child):
"""
Remove node if subnode is empty
"""
child_full_path = os.path.join(path, child)
if (
not zk.exists(child_full_path)
or len(_get_children(zk, child_full_path)) == 0
):
_try_delete_zk_node(zk, path)

zk_root_path = _format_path(ctx, zk_root_path)
with zk_client(ctx) as zk:
table_paths = _find_parents(zk, zk_root_path, nodes, "replicas")
_set_replicas_is_lost(zk, table_paths, nodes)
_remove_replicas_queues(zk, table_paths, nodes)
_nodes_absent(zk, zk_root_path, nodes)
for path in table_paths:
_absent_if_empty_child(zk, path, "replicas")


@contextmanager
def zk_client(ctx):
zk = _get_zk_client(ctx)
Expand Down
43 changes: 43 additions & 0 deletions tests/features/chadmin_zookeeper.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
Feature: keeper-monitoring tool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a copy-paste error.


Background:
Given default configuration
And a working s3
And a working zookeeper
And a working clickhouse on clickhouse01


Scenario: Cleanup all hosts
When we execute chadmin create zk nodes on zookeeper01
"""
/test/write_sli_part/shard1/replicas/host1.net
/test/write_sli_part/shard1/replicas/host2.net
/test/read_sli_part/shard1/replicas/host1.net
/test/read_sli_part/shard1/replicas/host2.net
/test/write_sli_part/shard1/log
"""
And we do hosts cleanup on zookeeper01 with fqdn host1.net,host2.net and zk root /test

Then the list of children on zookeeper01 for zk node /test/write_sli_part are empty
And the list of children on zookeeper01 for zk node /test/read_sli_part are empty

Scenario: Cleanup single host
When we execute chadmin create zk nodes on zookeeper01
"""
/test/write_sli_part/shard1/replicas/host1.net
/test/write_sli_part/shard1/replicas/host2.net
/test/read_sli_part/shard1/replicas/host1.net
/test/read_sli_part/shard1/replicas/host2.net
/test/write_sli_part/shard1/log
"""
And we do hosts cleanup on zookeeper01 with fqdn host1.net and zk root /test


Then the list of children on zookeeper01 for zk node /test/write_sli_part/shard1/replicas/ are equal to
"""
/test/write_sli_part/shard1/replicas/host2.net
"""
And the list of children on zookeeper01 for zk node /test/read_sli_part/shard1/replicas/ are equal to
"""
/test/read_sli_part/shard1/replicas/host2.net
"""
32 changes: 32 additions & 0 deletions tests/modules/chadmin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
class Chadmin:
def __init__(self, container):
self._container = container

def exec_cmd(self, cmd):
ch_admin_cmd = f"chadmin {cmd}"
print("CMD: " + ch_admin_cmd)
result = self._container.exec_run(["bash", "-c", ch_admin_cmd], user="root")
return result

def create_zk_node(self, zk_node, no_ch_config=True, recursive=True):
cmd = "zookeeper {use_config} create {make_parents} {node}".format(
use_config="--no-ch-config" if no_ch_config else "",
make_parents="--make-parents" if recursive else "",
node=zk_node,
)
return self.exec_cmd(cmd)

def zk_list(self, zk_node, no_ch_config=True):
cmd = "zookeeper {use_config} list {node}".format(
use_config="--no-ch-config" if no_ch_config else "",
node=zk_node,
)
return self.exec_cmd(cmd)

def zk_cleanup(self, fqdn, zk_root, no_ch_config=True):
cmd = "zookeeper {use_config} clickhouse-hosts-cleanup --root {root} --fqdn {hosts}".format(
use_config="--no-ch-config" if no_ch_config else "",
root=zk_root,
hosts=fqdn,
)
return self.exec_cmd(cmd)
40 changes: 40 additions & 0 deletions tests/steps/chadmin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Steps for interacting with chadmin.
"""

from behave import then, when
from hamcrest import assert_that, equal_to
from modules.chadmin import Chadmin
from modules.docker import get_container


@when("we execute chadmin create zk nodes on {node:w}")
def step_create_(context, node):
container = get_container(context, node)
nodes = context.text.strip().split("\n")
chadmin = Chadmin(container)

for node in nodes:
result = chadmin.create_zk_node(node)
assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}"


@when("we do hosts cleanup on {node} with fqdn {fqdn} and zk root {zk_root}")
def step_host_cleanup(context, node, fqdn, zk_root):
container = get_container(context, node)
result = Chadmin(container).zk_cleanup(fqdn, zk_root)
assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}"


@then("the list of children on {node:w} for zk node {zk_node} are equal to")
def step_childen_list(context, node, zk_node):
container = get_container(context, node)
result = Chadmin(container).zk_list(zk_node)
assert_that(result.output.decode(), equal_to(context.text + "\n"))


@then("the list of children on {node:w} for zk node {zk_node} are empty")
def step_childen_list_empty(context, node, zk_node):
container = get_container(context, node)
result = Chadmin(container).zk_list(zk_node)
assert_that(result.output.decode(), equal_to("\n"))