From edcbe4fc811a039fff5a610871f4281236df1107 Mon Sep 17 00:00:00 2001 From: Maxim Yurchuk Date: Mon, 8 Apr 2024 08:51:01 +0000 Subject: [PATCH] Allow customize wall-e provider (#3515) --- ydb/tools/cfg/walle/walle.py | 4 +- ydb/tools/ydbd_slice/__init__.py | 86 +++++++++++---------- ydb/tools/ydbd_slice/cluster_description.py | 15 ++-- ydb/tools/ydbd_slice/handlers.py | 53 ++++++------- ydb/tools/ydbd_slice/nodes.py | 4 +- 5 files changed, 82 insertions(+), 80 deletions(-) diff --git a/ydb/tools/cfg/walle/walle.py b/ydb/tools/cfg/walle/walle.py index e2677f48a971..d9eac1290693 100644 --- a/ydb/tools/cfg/walle/walle.py +++ b/ydb/tools/cfg/walle/walle.py @@ -37,7 +37,9 @@ def get_rack(self, hostname): return hostname def get_datacenter(self, hostname): - return hostname + # Keep DC name short, because of + # BAD_REQUEST (nameservice validator: node 1 has data center in Wall-E location longer than 4 symbols) + return "FAKE" def get_body(self, hostname): return zlib.crc32(hostname.encode()) diff --git a/ydb/tools/ydbd_slice/__init__.py b/ydb/tools/ydbd_slice/__init__.py index d2c2ab2e6c2b..2f630653ffaf 100644 --- a/ydb/tools/ydbd_slice/__init__.py +++ b/ydb/tools/ydbd_slice/__init__.py @@ -10,6 +10,7 @@ import warnings from urllib3.exceptions import HTTPWarning +from ydb.tools.cfg.walle import NopHostsInformationProvider from ydb.tools.ydbd_slice import nodes, handlers, cluster_description from ydb.tools.ydbd_slice.kube import handlers as kube_handlers, docker @@ -248,9 +249,9 @@ def handler(signum, frame): raise Terminate(signum, frame) -def safe_load_cluster_details(cluster_yaml): +def safe_load_cluster_details(cluster_yaml, walle_provider): try: - cluster_details = cluster_description.ClusterDetails(cluster_yaml) + cluster_details = cluster_description.ClusterDetails(cluster_yaml, walle_provider) except IOError as io_err: print('', file=sys.stderr) print("unable to open YAML params as a file, check args", file=sys.stderr) @@ -298,8 +299,8 @@ def deduce_components_from_args(args, cluster_details): return result -def deduce_nodes_from_args(args): - cluster_hosts = safe_load_cluster_details(args.cluster).hosts_names +def deduce_nodes_from_args(args, walle_provider): + cluster_hosts = safe_load_cluster_details(args.cluster, walle_provider).hosts_names result = cluster_hosts if args.nodes is not None: @@ -490,11 +491,11 @@ def component_args(): return args -def add_explain_mode(modes): +def add_explain_mode(modes, walle_provider): def _run(args): logger.debug("run func explain with cmd args is '%s'", args) - cluster_details = safe_load_cluster_details(args.cluster) + cluster_details = safe_load_cluster_details(args.cluster, walle_provider) components = deduce_components_from_args(args, cluster_details) kikimr_bin, kikimr_compressed_bin = deduce_kikimr_bin_from_args(args) @@ -508,6 +509,7 @@ def _run(args): args.out_cfg, kikimr_bin, kikimr_compressed_bin, + walle_provider ) if 'kikimr' in components: @@ -531,26 +533,26 @@ def _run(args): mode.set_defaults(handler=_run) -def dispatch_run_light(func, args): +def dispatch_run_light(func, args, walle_provider): logger.debug("run func '%s' with cmd args is '%s'", func.__name__, args) - cluster_details = safe_load_cluster_details(args.cluster) + cluster_details = safe_load_cluster_details(args.cluster, walle_provider) components = deduce_components_from_args(args, cluster_details) logger.debug("components is '%s'", components) - nodes = deduce_nodes_from_args(args) + nodes = deduce_nodes_from_args(args, walle_provider) - func(components, nodes, cluster_details) + func(components, nodes, cluster_details, walle_provider) -def dispatch_run(func, args): +def dispatch_run(func, args, walle_provider): logger.debug("run func '%s' with cmd args is '%s'", func.__name__, args) - cluster_details = safe_load_cluster_details(args.cluster) + cluster_details = safe_load_cluster_details(args.cluster, walle_provider) components = deduce_components_from_args(args, cluster_details) - nodes = deduce_nodes_from_args(args) + nodes = deduce_nodes_from_args(args, walle_provider) temp_dir = deduce_temp_dir_from_args(args) clear_tmp = not args.dry_run and args.temp_dir is None @@ -561,31 +563,32 @@ def dispatch_run(func, args): cluster_details, out_dir=temp_dir, kikimr_bin=kikimr_bin, - kikimr_compressed_bin=kikimr_compressed_bin + kikimr_compressed_bin=kikimr_compressed_bin, + walle_provider=walle_provider ) v = vars(args) - func(components, nodes, cluster_details, configurator, v.get('clear_logs'), args) + func(components, nodes, cluster_details, configurator, v.get('clear_logs'), args, walle_provider) if clear_tmp: logger.debug("remove temp dirs '%s'", temp_dir) shutil.rmtree(temp_dir) -def dispatch_run_raw_cfg(func, args): +def dispatch_run_raw_cfg(func, args, walle_provider): logger.debug("run func '%s' with cmd args is '%s'", func.__name__, args) - cluster_details = safe_load_cluster_details(args.cluster) + cluster_details = safe_load_cluster_details(args.cluster, walle_provider) components = deduce_components_from_args(args, cluster_details) - nodes = deduce_nodes_from_args(args) + nodes = deduce_nodes_from_args(args, walle_provider) func(components, nodes, cluster_details, args.raw_cfg) -def add_install_mode(modes): +def add_install_mode(modes, walle_provider): def _run(args): - dispatch_run(handlers.slice_install, args) + dispatch_run(handlers.slice_install, args, walle_provider) mode = modes.add_parser( "install", @@ -597,9 +600,9 @@ def _run(args): mode.set_defaults(handler=_run) -def add_update_mode(modes): +def add_update_mode(modes, walle_provider): def _run(args): - dispatch_run(handlers.slice_update, args) + dispatch_run(handlers.slice_update, args, walle_provider) mode = modes.add_parser( "update", @@ -612,9 +615,9 @@ def _run(args): mode.set_defaults(handler=_run) -def add_update_raw_configs(modes): +def add_update_raw_configs(modes, walle_provider): def _run(args): - dispatch_run_raw_cfg(handlers.slice_update_raw_configs, args) + dispatch_run_raw_cfg(handlers.slice_update_raw_configs, args, walle_provider) mode = modes.add_parser( "update-raw-cfg", @@ -631,9 +634,9 @@ def _run(args): mode.set_defaults(handler=_run) -def add_stop_mode(modes): +def add_stop_mode(modes, walle_provider): def _run(args): - dispatch_run_light(handlers.slice_stop, args) + dispatch_run_light(handlers.slice_stop, args, walle_provider) mode = modes.add_parser( "stop", @@ -645,9 +648,9 @@ def _run(args): mode.set_defaults(handler=_run) -def add_start_mode(modes): +def add_start_mode(modes, walle_provider): def _run(args): - dispatch_run_light(handlers.slice_start, args) + dispatch_run_light(handlers.slice_start, args, walle_provider) mode = modes.add_parser( "start", @@ -660,9 +663,9 @@ def _run(args): mode.set_defaults(handler=_run) -def add_clear_mode(modes): +def add_clear_mode(modes, walle_provider): def _run(args): - dispatch_run_light(handlers.slice_clear, args) + dispatch_run_light(handlers.slice_clear, args, walle_provider) mode = modes.add_parser( "clear", @@ -674,9 +677,9 @@ def _run(args): mode.set_defaults(handler=_run) -def add_format_mode(modes): +def add_format_mode(modes, walle_provider): def _run(args): - dispatch_run_light(handlers.slice_format, args) + dispatch_run_light(handlers.slice_format, args, walle_provider) mode = modes.add_parser( "format", @@ -1140,7 +1143,7 @@ def _run(args): mode.set_defaults(handler=_run) -def main(): +def main(walle_provider=None): try: signal.signal(signal.SIGTERM, Terminate.handler) @@ -1185,14 +1188,15 @@ def main(): ) modes = parser.add_subparsers() - add_start_mode(modes) - add_stop_mode(modes) - add_install_mode(modes) - add_update_mode(modes) - add_update_raw_configs(modes) - add_clear_mode(modes) - add_format_mode(modes) - add_explain_mode(modes) + walle_provider = walle_provider or NopHostsInformationProvider() + add_start_mode(modes, walle_provider) + add_stop_mode(modes, walle_provider) + add_install_mode(modes, walle_provider) + add_update_mode(modes, walle_provider) + add_update_raw_configs(modes, walle_provider) + add_clear_mode(modes, walle_provider) + add_format_mode(modes, walle_provider) + add_explain_mode(modes, walle_provider) add_docker_build_mode(modes) add_kube_generate_mode(modes) add_kube_install_mode(modes) diff --git a/ydb/tools/ydbd_slice/cluster_description.py b/ydb/tools/ydbd_slice/cluster_description.py index 543af119c2be..4d1bac8400e9 100644 --- a/ydb/tools/ydbd_slice/cluster_description.py +++ b/ydb/tools/ydbd_slice/cluster_description.py @@ -8,7 +8,6 @@ from ydb.tools.cfg.dynamic import DynamicConfigGenerator from ydb.tools.cfg.static import StaticConfigGenerator from ydb.tools.cfg.utils import write_to_file -from ydb.tools.cfg.walle import NopHostsInformationProvider DynamicSlot = namedtuple( @@ -28,13 +27,13 @@ class ClusterDetails(ClusterDetailsProvider): SLOTS_PORTS_START = 31000 PORTS_SHIFT = 10 - def __init__(self, cluster_description_path): + def __init__(self, cluster_description_path, walle_provider): self.__template = None self.__details = None self.__databases = None self.__dynamic_slots = None self._cluster_description_file = cluster_description_path - self._walle_provider = NopHostsInformationProvider() + self._walle_provider = walle_provider super(ClusterDetails, self).__init__(self.template, self._walle_provider) @@ -103,7 +102,8 @@ def __init__( cluster_details, out_dir, kikimr_bin, - kikimr_compressed_bin + kikimr_compressed_bin, + walle_provider ): self.__cluster_details = cluster_details self.__kikimr_bin_file = kikimr_bin @@ -114,6 +114,7 @@ def __init__( self.__dynamic = None self.__dynamic_cfg = os.path.join(out_dir, 'kikimr-dynamic') self.__subdomains = None + self.__walle_provider = walle_provider @property def kikimr_bin(self): @@ -169,8 +170,7 @@ def static(self): assert self.__kikimr_bin_file if self.__static is None: - walle_provider = NopHostsInformationProvider() - self.__static = StaticConfigGenerator(self.template, self.__kikimr_bin_file, self.__static_cfg, walle_provider=walle_provider) + self.__static = StaticConfigGenerator(self.template, self.__kikimr_bin_file, self.__static_cfg, walle_provider=self.__walle_provider) return self.__static def create_static_cfg(self): @@ -183,9 +183,8 @@ def dynamic(self): assert self.__kikimr_bin_file if self.__dynamic is None: - walle_provider = NopHostsInformationProvider() self.__dynamic = DynamicConfigGenerator( - self.__cluster_details.template, self.__kikimr_bin_file, self.__dynamic_cfg, walle_provider=walle_provider + self.__cluster_details.template, self.__kikimr_bin_file, self.__dynamic_cfg, walle_provider=self.__walle_provider ) return self.__dynamic diff --git a/ydb/tools/ydbd_slice/handlers.py b/ydb/tools/ydbd_slice/handlers.py index 66a1e491950f..709561a16371 100644 --- a/ydb/tools/ydbd_slice/handlers.py +++ b/ydb/tools/ydbd_slice/handlers.py @@ -4,8 +4,6 @@ import subprocess from collections import deque, defaultdict -from ydb.tools.cfg.walle import NopHostsInformationProvider - logger = logging.getLogger(__name__) @@ -44,14 +42,14 @@ def clear_logs(nodes): nodes.execute_async(cmd) -def slice_format(components, nodes, cluster_details): - slice_stop(components, nodes, cluster_details) +def slice_format(components, nodes, cluster_details, walle_provider): + slice_stop(components, nodes, cluster_details), walle_provider format_drivers(nodes) - slice_start(components, nodes, cluster_details) + slice_start(components, nodes, cluster_details, walle_provider) -def slice_clear(components, nodes, cluster_details): - slice_stop(components, nodes, cluster_details) +def slice_clear(components, nodes, cluster_details, walle_provider): + slice_stop(components, nodes, cluster_details, walle_provider) if 'dynamic_slots' in components: for slot in cluster_details.dynamic_slots.values(): @@ -97,8 +95,8 @@ def dynamic_configure(configurations): ) -def slice_install(components, nodes, cluster_details, configurator, do_clear_logs, args): - slice_stop(components, nodes, cluster_details) +def slice_install(components, nodes, cluster_details, configurator, do_clear_logs, args, walle_provider): + slice_stop(components, nodes, cluster_details, walle_provider) if 'dynamic_slots' in components or 'kikimr' in components: stop_all_slots(nodes) @@ -121,15 +119,14 @@ def slice_install(components, nodes, cluster_details, configurator, do_clear_log start_static(nodes) dynamic_configure(configurator) - deploy_slot_configs(components, nodes, cluster_details) - start_dynamic(components, nodes, cluster_details) + deploy_slot_configs(components, nodes, cluster_details, walle_provider) + start_dynamic(components, nodes, cluster_details, walle_provider) -def get_available_slots(components, nodes, cluster_details): +def get_available_slots(components, nodes, cluster_details, walle_provider): if 'dynamic_slots' not in components: return {} - walle = NopHostsInformationProvider() slots_per_domain = {} for domain in cluster_details.domains: @@ -140,7 +137,7 @@ def get_available_slots(components, nodes, cluster_details): if slot.domain == domain.domain_name: for node in nodes.nodes_list: item = (slot, node) - available_slots_per_zone[walle.get_datacenter(node).lower()].append(item) + available_slots_per_zone[walle_provider.get_datacenter(node).lower()].append(item) available_slots_per_zone['any'].append(item) all_available_slots_count += 1 slots_per_domain[domain.domain_name] = available_slots_per_zone @@ -179,11 +176,11 @@ def deploy_slot_config_for_tenant(nodes, slot, tenant, node): nodes.execute_async(cmd, check_retcode=False, nodes=[node]) -def deploy_slot_configs(components, nodes, cluster_details): +def deploy_slot_configs(components, nodes, cluster_details, walle_provider): if 'dynamic_slots' not in components: return - slots_per_domain = get_available_slots(components, nodes, cluster_details)[0] + slots_per_domain = get_available_slots(components, nodes, cluster_details, walle_provider)[0] for domain in cluster_details.domains: slots_taken = set() available_slots_per_zone = slots_per_domain[domain.domain_name] @@ -248,7 +245,7 @@ def start_static(nodes): nodes.execute_async("sudo service kikimr start", check_retcode=False) -def start_dynamic(components, nodes, cluster_details): +def start_dynamic(components, nodes, cluster_details, walle_provider): if 'dynamic_slots' in components: def get_numa_nodes(nodes): @@ -263,7 +260,7 @@ def get_numa_nodes(nodes): numa_nodes = None # get_numa_nodes(nodes) numa_nodes_counters = {node: 0 for node in nodes.nodes_list} - (slots_per_domain, all_available_slots_count,) = get_available_slots(components, nodes, cluster_details) + (slots_per_domain, all_available_slots_count,) = get_available_slots(components, nodes, cluster_details, walle_provider) for domain in cluster_details.domains: @@ -298,11 +295,11 @@ def get_numa_nodes(nodes): logger.warning('{count} unused slots'.format(count=all_available_slots_count - len(slots_taken))) -def slice_start(components, nodes, cluster_details): +def slice_start(components, nodes, cluster_details, walle_provider): if 'kikimr' in components: start_static(nodes) - start_dynamic(components, nodes, cluster_details) + start_dynamic(components, nodes, cluster_details, walle_provider) def stop_all_slots(nodes): @@ -343,7 +340,7 @@ def stop_dynamic(components, nodes, cluster_details): nodes._check_async_execution(tasks, False) -def slice_stop(components, nodes, cluster_details): +def slice_stop(components, nodes, cluster_details, walle_provider): stop_dynamic(components, nodes, cluster_details) if 'kikimr' in components: @@ -398,7 +395,7 @@ def deploy_secrets(nodes, yav_version): ) -def slice_update(components, nodes, cluster_details, configurator, do_clear_logs, args): +def slice_update(components, nodes, cluster_details, configurator, do_clear_logs, args, walle_provider): if do_clear_logs: clear_logs(nodes) @@ -406,22 +403,22 @@ def slice_update(components, nodes, cluster_details, configurator, do_clear_logs if 'bin' in components.get('kikimr', []): update_kikimr(nodes, configurator.kikimr_bin, configurator.kikimr_compressed_bin) - slice_stop(components, nodes, cluster_details) + slice_stop(components, nodes, cluster_details, walle_provider) if 'kikimr' in components: if 'cfg' in components.get('kikimr', []): static = configurator.create_static_cfg() update_cfg(nodes, static) deploy_secrets(nodes, args.yav_version) - deploy_slot_configs(components, nodes, cluster_details) - slice_start(components, nodes, cluster_details) + deploy_slot_configs(components, nodes, cluster_details, walle_provider) + slice_start(components, nodes, cluster_details, walle_provider) -def slice_update_raw_configs(components, nodes, cluster_details, config_path): - slice_stop(components, nodes, cluster_details) +def slice_update_raw_configs(components, nodes, cluster_details, config_path, walle_provider): + slice_stop(components, nodes, cluster_details, walle_provider) if 'kikimr' in components: if 'cfg' in components.get('kikimr', []): kikimr_cfg = os.path.join(config_path, 'kikimr-static') update_cfg(nodes, kikimr_cfg) - slice_start(components, nodes, cluster_details) + slice_start(components, nodes, cluster_details, walle_provider) diff --git a/ydb/tools/ydbd_slice/nodes.py b/ydb/tools/ydbd_slice/nodes.py index 6395b6531fa0..13a2b38d724d 100644 --- a/ydb/tools/ydbd_slice/nodes.py +++ b/ydb/tools/ydbd_slice/nodes.py @@ -107,8 +107,8 @@ def _copy_between_nodes(self, hub, hub_path, hosts, remote_path): if self._dry_run: continue cmd = [ - "ssh", dst, "-A", "sudo", "rsync", "-avqW", "--del", "--no-o", "--no-g", - "--rsh='ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -l %s'" % os.getenv("USER"), + "ssh", dst, '-o', 'StrictHostKeyChecking=no', '-o', 'UserKnownHostsFile=/dev/null', "-A", "sudo", "rsync", "-avqW", "--del", + "--no-o", "--no-g", "--rsh='ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -l %s'" % os.getenv("USER"), src, remote_path, ] process = subprocess.Popen(cmd)