Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move zookeeper.py in chadmin #70

Merged
merged 4 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion ch_tools/chadmin/cli/zookeeper_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ch_tools.chadmin.internal.table_replica import get_table_replica
from ch_tools.chadmin.internal.zookeeper import (
check_zk_node,
clean_zk_metadata_for_hosts,
create_zk_nodes,
delete_zk_nodes,
get_zk_node,
Expand Down Expand Up @@ -41,8 +42,18 @@
help="Do not try to get parameters from clickhouse config.xml.",
default=False,
)
@option(
"-c",
"--chroot",
"zk_root_path",
type=str,
help="Cluster ZooKeeper root path. If not specified,the root path will be used.",
required=False,
)
@pass_context
def zookeeper_group(ctx, host, port, timeout, zkcli_identity, no_chroot, no_ch_config):
def zookeeper_group(
ctx, host, port, timeout, zkcli_identity, no_chroot, no_ch_config, zk_root_path
):
"""ZooKeeper management commands.

ZooKeeper command runs client which connects to Zookeeper node.
Expand All @@ -57,6 +68,7 @@ def zookeeper_group(ctx, host, port, timeout, zkcli_identity, no_chroot, no_ch_c
"zkcli_identity": zkcli_identity,
"no_chroot": no_chroot,
"no_ch_config": no_ch_config,
"zk_root_path": zk_root_path,
}


Expand Down Expand Up @@ -266,3 +278,13 @@ def delete_ddl_task_command(ctx, tasks):
"""
paths = [f"/clickhouse/task_queue/ddl/{task}" for task in tasks]
delete_zk_nodes(ctx, paths)


@zookeeper_group.command(
name="cleanup-removed-hosts-metadata",
help="Remove metadata from Zookeeper for specified hosts.",
)
@argument("fqdn", type=ListParamType())
@pass_context
def clickhouse_hosts_command(ctx, fqdn):
clean_zk_metadata_for_hosts(ctx, fqdn)
168 changes: 155 additions & 13 deletions ch_tools/chadmin/internal/zookeeper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import os
import re
from contextlib import contextmanager
from queue import Queue

from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kazoo.exceptions import NoNodeError, NotEmptyError

from ch_tools.chadmin.cli import get_config, get_macros

Expand All @@ -27,19 +29,22 @@ def get_zk_node_acls(ctx, path):
return zk.get_acls(path)


def _get_children(zk, path):
try:
return zk.get_children(path)
except NoNodeError:
return [] # in the case ZK deletes a znode while we traverse the tree


def list_zk_nodes(ctx, path, verbose=False):
def _stat_node(zk, node):
descendants_count = 0
queue = [node]
while queue:
item = queue.pop()
try:
children = zk.get_children(item)
descendants_count += len(children)
queue.extend(os.path.join(item, node) for node in children)
except NoNodeError:
# ZooKeeper nodes can be deleted during node tree traversal
pass
children = _get_children(zk, item)
descendants_count += len(children)
queue.extend(os.path.join(item, node) for node in children)

return {
"path": node,
Expand Down Expand Up @@ -83,11 +88,15 @@ def delete_zk_node(ctx, path):


def delete_zk_nodes(ctx, paths):
paths_formated = [_format_path(ctx, path) for path in paths]
with zk_client(ctx) as zk:
for path in paths:
path = _format_path(ctx, path)
print(f"Deleting ZooKeeper node {path}")
zk.delete(path, recursive=True)
_delete_zk_nodes(zk, paths_formated)


def _delete_zk_nodes(zk, paths):
for path in paths:
print(f"Deleting ZooKeeper node {path}")
zk.delete(path, recursive=True)


def _format_path(ctx, path):
Expand All @@ -98,6 +107,136 @@ def _format_path(ctx, path):
return path.format_map(get_macros(ctx))


def _set_node_value(zk, path, value):
"""
Set value to node in zk.
"""
if zk.exists(path):
try:
zk.set(path, value.encode())
except NoNodeError:
print(f"Can not set for node: {path} value : {value}")


def _find_paths(zk, root_path, included_paths_regexp, excluded_paths_regexp=None):
"""
Traverse zookeeper tree from root_path with bfs approach.

Return paths of nodes that match the include regular expression and do not match the excluded one.
"""
paths = set()
queue: Queue = Queue()
queue.put(root_path)
included_regexp = re.compile("|".join(included_paths_regexp))
excluded_regexp = (
re.compile("|".join(excluded_paths_regexp)) if excluded_paths_regexp else None
)

while not queue.empty():
path = queue.get()
if excluded_regexp and re.match(excluded_regexp, path):
continue

for child_node in _get_children(zk, path):
subpath = os.path.join(path, child_node)

if re.match(included_regexp, subpath):
paths.add(subpath)
else:
queue.put(os.path.join(path, subpath))

return list(paths)


def clean_zk_metadata_for_hosts(ctx, nodes):
"""
Perform cleanup in zookeeper after deleting hosts in the cluster or whole cluster deleting.
"""

def _try_delete_zk_node(zk, node):
try:
_delete_zk_nodes(zk, [node])
except NoNodeError:
# Someone deleted node before us. Do nothing.
print("Node {node} is already absent, skipped".format(node=node))
except NotEmptyError:
# Someone created child node while deleting.
# I'm not sure that we can get this exception with recursive=True.
# Do nothing.
print("Node {node} is not empty, skipped".format(node=node))

def _find_parents(zk, root_path, nodes, parent_name):
excluded_paths = [
".*clickhouse/task_queue",
".*clickhouse/zero_copy",
]
included_paths = [".*/" + parent_name + "/" + node for node in nodes]
paths = _find_paths(zk, root_path, included_paths, excluded_paths)
# Paths will be like */shard1/replicas/hostname. But we need */shard1.
# Go up for 2 directories.
paths = [os.sep.join(path.split(os.sep)[:-2]) for path in paths]
# One path might be in list several times. Make list unique.
return list(set(paths))

def _set_replicas_is_lost(zk, table_paths, nodes):
"""
Set flag <path>/replicas/<replica_name>/is_lost to 1
"""
for path in table_paths:

replica_path = os.path.join(path, "replicas")
if not zk.exists(replica_path):
continue

for node in nodes:
is_lost_flag_path = os.path.join(replica_path, node, "is_lost")
print("Set is_lost_flag " + is_lost_flag_path)
_set_node_value(zk, is_lost_flag_path, "1")

def _remove_replicas_queues(zk, paths, replica_names):
"""
Remove <path>/replicas/<replica_name>/queue
"""
for path in paths:
replica_name = os.path.join(path, "replicas")
if not zk.exists(replica_name):
continue

for replica_name in replica_names:
queue_path = os.path.join(replica_name, replica_name, "queue")
if not zk.exists(queue_path):
continue

if zk.exists(queue_path):
_try_delete_zk_node(zk, queue_path)

def _nodes_absent(zk, zk_root_path, nodes):
included_paths = [".*/" + node for node in nodes]
paths_to_delete = _find_paths(zk, zk_root_path, included_paths)
for node in paths_to_delete:
_try_delete_zk_node(zk, node)

def _absent_if_empty_child(zk, path, child):
"""
Remove node if subnode is empty
"""
child_full_path = os.path.join(path, child)
if (
not zk.exists(child_full_path)
or len(_get_children(zk, child_full_path)) == 0
):
_try_delete_zk_node(zk, path)

zk_root_path = _format_path(ctx, "/")
with zk_client(ctx) as zk:
table_paths = _find_parents(zk, zk_root_path, nodes, "replicas")
_set_replicas_is_lost(zk, table_paths, nodes)
_remove_replicas_queues(zk, table_paths, nodes)
_nodes_absent(zk, zk_root_path, nodes)
for path in table_paths:
_absent_if_empty_child(zk, path, "replicas")


@contextmanager
def zk_client(ctx):
zk = _get_zk_client(ctx)
Expand All @@ -119,6 +258,7 @@ def _get_zk_client(ctx):
zkcli_identity = args.get("zkcli_identity")
no_chroot = args.get("no_chroot", False)
no_ch_config = args.get("no_ch_config", False)
zk_root_path = args.get("zk_root_path", None)

if no_ch_config:
if not host:
Expand All @@ -132,7 +272,9 @@ def _get_zk_client(ctx):
f'{host if host else node["host"]}:{port if port else node["port"]}'
for node in zk_config.nodes
)
if not no_chroot and zk_config.root is not None:
if zk_root_path:
connect_str += zk_root_path
elif not no_chroot and zk_config.root is not None:
connect_str += zk_config.root

if zkcli_identity is None:
Expand Down
43 changes: 43 additions & 0 deletions tests/features/chadmin_zookeeper.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
Feature: keeper-monitoring tool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a copy-paste error.


Background:
Given default configuration
And a working s3
And a working zookeeper
And a working clickhouse on clickhouse01


Scenario: Cleanup all hosts
When we execute chadmin create zk nodes on zookeeper01
"""
/test/write_sli_part/shard1/replicas/host1.net
/test/write_sli_part/shard1/replicas/host2.net
/test/read_sli_part/shard1/replicas/host1.net
/test/read_sli_part/shard1/replicas/host2.net
/test/write_sli_part/shard1/log
"""
And we do hosts cleanup on zookeeper01 with fqdn host1.net,host2.net and zk root /test

Then the list of children on zookeeper01 for zk node /test/write_sli_part are empty
And the list of children on zookeeper01 for zk node /test/read_sli_part are empty

Scenario: Cleanup single host
When we execute chadmin create zk nodes on zookeeper01
"""
/test/write_sli_part/shard1/replicas/host1.net
/test/write_sli_part/shard1/replicas/host2.net
/test/read_sli_part/shard1/replicas/host1.net
/test/read_sli_part/shard1/replicas/host2.net
/test/write_sli_part/shard1/log
"""
And we do hosts cleanup on zookeeper01 with fqdn host1.net and zk root /test


Then the list of children on zookeeper01 for zk node /test/write_sli_part/shard1/replicas/ are equal to
"""
/test/write_sli_part/shard1/replicas/host2.net
"""
And the list of children on zookeeper01 for zk node /test/read_sli_part/shard1/replicas/ are equal to
"""
/test/read_sli_part/shard1/replicas/host2.net
"""
35 changes: 35 additions & 0 deletions tests/modules/chadmin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging


class Chadmin:
def __init__(self, container):
self._container = container

def exec_cmd(self, cmd):
ch_admin_cmd = f"chadmin {cmd}"
logging.debug("chadmin command:", ch_admin_cmd)
result = self._container.exec_run(["bash", "-c", ch_admin_cmd], user="root")
return result

def create_zk_node(self, zk_node, no_ch_config=True, recursive=True):
cmd = "zookeeper {use_config} create {make_parents} {node}".format(
use_config="--no-ch-config" if no_ch_config else "",
make_parents="--make-parents" if recursive else "",
node=zk_node,
)
return self.exec_cmd(cmd)

def zk_list(self, zk_node, no_ch_config=True):
cmd = "zookeeper {use_config} list {node}".format(
use_config="--no-ch-config" if no_ch_config else "",
node=zk_node,
)
return self.exec_cmd(cmd)

def zk_cleanup(self, fqdn, zk_root, no_ch_config=True):
cmd = "zookeeper {use_config} --chroot {root} cleanup-removed-hosts-metadata {hosts}".format(
use_config="--no-ch-config" if no_ch_config else "",
root=zk_root,
hosts=fqdn,
)
return self.exec_cmd(cmd)
40 changes: 40 additions & 0 deletions tests/steps/chadmin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
Steps for interacting with chadmin.
"""

from behave import then, when
from hamcrest import assert_that, equal_to
from modules.chadmin import Chadmin
from modules.docker import get_container


@when("we execute chadmin create zk nodes on {node:w}")
def step_create_(context, node):
container = get_container(context, node)
nodes = context.text.strip().split("\n")
chadmin = Chadmin(container)

for node in nodes:
result = chadmin.create_zk_node(node)
assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}"


@when("we do hosts cleanup on {node} with fqdn {fqdn} and zk root {zk_root}")
def step_host_cleanup(context, node, fqdn, zk_root):
container = get_container(context, node)
result = Chadmin(container).zk_cleanup(fqdn, zk_root)
assert result.exit_code == 0, f" output:\n {result.output.decode().strip()}"


@then("the list of children on {node:w} for zk node {zk_node} are equal to")
def step_childen_list(context, node, zk_node):
container = get_container(context, node)
result = Chadmin(container).zk_list(zk_node)
assert_that(result.output.decode(), equal_to(context.text + "\n"))


@then("the list of children on {node:w} for zk node {zk_node} are empty")
def step_childen_list_empty(context, node, zk_node):
container = get_container(context, node)
result = Chadmin(container).zk_list(zk_node)
assert_that(result.output.decode(), equal_to("\n"))