diff --git a/bin/yb-docker-ctl b/bin/yb-docker-ctl new file mode 100755 index 000000000000..6a8b9e1268c2 --- /dev/null +++ b/bin/yb-docker-ctl @@ -0,0 +1,527 @@ +#!/usr/bin/env python2.7 +# Copyright (c) YugaByte, Inc. + +import argparse +import json +import os +import re +import subprocess +import sys +import time + +YB_NETWORK_NAME = "yb-net" +YB_DOCKER_IMAGE = "yugabytedb/yugabyte:latest" +CONTAINER_CONFIG_CMD = 'docker inspect -f "{{json .ContainerConfig.%s}}" %s' +CONTAINER_IMAGE_FIND = 'docker images %s --format "{{.ID}}"' + +DAEMON_TYPE_MASTER = 'master' +DAEMON_TYPE_TSERVER = 'tserver' + +DAEMON_TYPES = [ + DAEMON_TYPE_MASTER, + DAEMON_TYPE_TSERVER +] + + +def get_subprocess_result_as_str(*popenargs, **kwargs): + result = subprocess.check_output(*popenargs, **kwargs) + if not isinstance(result, str): + return result.decode('ascii') + return result + +class YBDockerControl(): + REPLICATION_FACTOR_OPTIMAL = 1 + NUM_SHARDS_PER_TSERVER = 2 + MEM_LIMIT_BYTES = 1024 * 1024 * 1024 + YB_MASTER_FORMAT = "yb-master-n{}" + YB_MASTER_PORT = "7100" + YB_MASTER_UI_PORT = 7000 + YB_MASTER_FORMAT_WITH_PORT = "{}:{}".format(YB_MASTER_FORMAT, YB_MASTER_PORT) + YB_TSERVER_FORMAT = "yb-tserver-n{}" + YB_TSERVER_PORT = "9100" + YB_TSERVER_UI_PORT = 9000 + YB_TSERVER_FORMAT_WITH_PORT = "{}:{}".format(YB_TSERVER_FORMAT, YB_TSERVER_PORT) + CLIENT_PORTS = [9042, 6379] + POSTGRES_PORT = 5433 + YB_POSTGRES_FORMAT_WITH_PORT = "{}:{}".format(YB_TSERVER_FORMAT, POSTGRES_PORT) + CONTAINER_SEARCH_CMD = 'docker ps -af "name=yb-" -q' + CONTAINER_INFO_CMD = 'docker inspect --format "{{ .ID }}|{{ .Name }}|' \ + '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}|' \ + '{{.State.Status}}|{{.State.Pid}}|{{.State.StartedAt}}" %s' + WAIT_SECONDS = 60 + + def __init__(self): + self.args = None + self.parser = None + self.containers = [] + self.container_count = 0 + self.num_master = 0 + self.num_tservers = 0 + self.verify_docker() + self.setup_yb_network() + self.setup_cli() + + self.volumes = self.read_container_config("Volumes") + self.working_dir = self.read_container_config("WorkingDir") + self.placement_info = [] + + def get_placement_list(self): + placement_list = [] + if self.args.placement_info is not None: + placement_list = self.args.placement_info.split(",") + return placement_list + + @staticmethod + def verify_docker(): + result = get_subprocess_result_as_str(['docker', 'info']) + if 'Error' in result: + sys.exit("Docker daemon not running") + result = get_subprocess_result_as_str(CONTAINER_IMAGE_FIND % YB_DOCKER_IMAGE, shell=True) + if not result: + subprocess.check_call(['docker', 'pull', YB_DOCKER_IMAGE]) + + @staticmethod + def setup_yb_network(): + try: + # Create YugaByte network bridge if not already exists + result = get_subprocess_result_as_str(['docker', 'network', + 'ls', '-q', '--filter', + 'name={}'.format(YB_NETWORK_NAME)]) + if not result: + get_subprocess_result_as_str(['docker', 'network', 'create', + '-d', 'bridge', YB_NETWORK_NAME]) + return True + return True + except subprocess.CalledProcessError: + sys.exit("Unable to create docker network {}".format(YB_NETWORK_NAME)) + + @staticmethod + def destroy_yb_network(): + try: + result = get_subprocess_result_as_str( + ['docker', 'network', 'ls', '-q', '--filter', 'name={}'.format(YB_NETWORK_NAME)]) + if result: + get_subprocess_result_as_str(['docker', 'network', 'rm', YB_NETWORK_NAME]) + except subprocess.CalledProcessError: + sys.exit("Unable to destroy docker network {}".format(YB_NETWORK_NAME)) + + + @staticmethod + def read_container_config(config_name): + try: + config = get_subprocess_result_as_str(CONTAINER_CONFIG_CMD % + (config_name, YB_DOCKER_IMAGE), + shell=True) + return json.loads(config.strip()) + except subprocess.CalledProcessError: + sys.exit("Unable to fetch container config {}".format(config_name)) + + @staticmethod + def parse_flag_args(flag_args): + flags = [] if flag_args is None else flag_args.split(",") + return [item.strip() for item in flags] + + @staticmethod + def get_parsed_flag_args(parsed_flag_args): + return ["--{}".format(item) for item in parsed_flag_args ] + + @staticmethod + def get_placement_flags(placement): + placement_flags = placement.split(".") + if len(placement_flags) != 3: + raise RuntimeError("Invalid argument: Each entry in placement info should " + "specify cloud, region and zone as cloud.region.zone," + "seperated by commas.") + + placement_flags_cmds = ["--placement_cloud={}".format(placement_flags[0]), + "--placement_region={}".format(placement_flags[1]), + "--placement_zone={}".format(placement_flags[2])] + return placement_flags_cmds + + + def add_shared_args(self, parser): + parser.add_argument( + '--num_shards_per_tserver', type=int, default=self.NUM_SHARDS_PER_TSERVER, + help='Number of shards (tablets) to create per tablet server for each table.') + parser.add_argument( + '--enable_ysql', '--enable_postgres', action="store_true", + help='Enable YugaByte SQL API support') + parser.add_argument( + '--disable_ysql', action="store_true", + help='Disable YugaByte SQL API support') + + def add_startup_args(self, parser): + parser.add_argument("--placement_info", default=None, + help="Specify the placement info in the following format:" + "cloud.region.zone. Can be comma seperated in case you would" + "want to pass more than one value.") + parser.add_argument("--master_flags", default=None, + help="Specify extra master flags as a set of key-value pairs." + "Format (key=value,key=value).") + parser.add_argument("--tserver_flags", default=None, + help="Specify extra tserver flags as a set of key-value pairs." + "Format (key=value,key=value).") + parser.add_argument("--v", default=0, choices=[str(i) for i in range(5)], + help="Specify the verbosity which dictates the amount of" + "logging on servers trace files." + "The default is 0 and maximum is 4.") + parser.add_argument("--use_cassandra_authentication", action="store_true", + help="Specify if you want cassandra authentication enabled") + + def fetch_containers(self): + try: + config = get_subprocess_result_as_str(self.CONTAINER_SEARCH_CMD, shell=True) + universe_containers = filter(None, config.strip().split("\n")) + self.containers = [] + for container_id in universe_containers: + container_info = get_subprocess_result_as_str( + self.CONTAINER_INFO_CMD % container_id, shell=True) + # Docker inspect adds a leading slash to the container name, we would just remove + # that so we have nicer name + self.containers.append(container_info.strip().replace("/yb-", "yb-")) + self.container_count = len(self.containers) + self.master_nodes = [node_info for node_info in self.containers + if 'master' in node_info] + self.tserver_nodes = [node_info for node_info in self.containers + if 'tserver' in node_info] + self.num_master = len(self.master_nodes) + self.num_tservers = len(self.tserver_nodes) + + if self.args.command == 'create' and self.container_count == 0: + self.master_addrs = [self.YB_MASTER_FORMAT_WITH_PORT.format(i) + for i in range(1, self.args.rf + 1)] + else: + self.master_addrs = ["{}:{}".format(node_info.split("|")[1], self.YB_MASTER_PORT) + for node_info in self.master_nodes] + except subprocess.CalledProcessError: + sys.exit("Unable to fetch running YugaByte containers") + + def setup_cli(self): + self.parser = argparse.ArgumentParser(description='YugaByte Docker Container Control') + subparsers = self.parser.add_subparsers(help='Commands') + + create_subparser = subparsers.add_parser('create', help='Create YugaByte Cluster') + create_subparser.set_defaults(command='create') + create_subparser.add_argument('--rf', type=int, default=self.REPLICATION_FACTOR_OPTIMAL, + help='Replication Factor') + create_subparser.add_argument('--timeout_in_sec', type=int, default=self.WAIT_SECONDS, + help='Seconds to wait for YugaByte cluster to come up') + + add_node_subparser = subparsers.add_parser('add_node', + help='Add a new YugaByte Cluster Node') + add_node_subparser.set_defaults(command='add_node') + + startup_subparsers = [create_subparser, add_node_subparser] + for subparser in startup_subparsers: + self.add_startup_args(subparser) + self.add_shared_args(subparser) + + status_subparser = subparsers.add_parser('status', + help='Check YugaByte Cluster status') + status_subparser.set_defaults(command='status') + + destroy_subparser = subparsers.add_parser('destroy', + help='Destroy YugaByte Cluster') + destroy_subparser.set_defaults(command='destroy') + + stop_node_subparser = subparsers.add_parser('stop_node', + help='Stop a YugaByte Cluster Node') + stop_node_subparser.add_argument("node", type=str, help='Node ID to stop') + stop_node_subparser.add_argument("--master", action="store_true", + help='Type of server stop') + stop_node_subparser.set_defaults(command='stop_node') + + start_node_subparser = subparsers.add_parser('start_node', + help='Start a YugaByte Cluster Node') + start_node_subparser.add_argument("node", type=str, help='Node ID to start') + start_node_subparser.add_argument("--master", action="store_true", + help='Type of server start') + start_node_subparser.set_defaults(command='start_node') + + stop_subparser = subparsers.add_parser('stop', + help='Stop YugaByte Cluster') + stop_subparser.set_defaults(command='stop') + + start_subparser = subparsers.add_parser('start', + help='Start YugaByte Cluster') + start_subparser.set_defaults(command='start') + + remove_node_subparser = subparsers.add_parser('remove_node', + help='Stop a YugaByte Cluster Node') + remove_node_subparser.add_argument("node", type=str, help='Node ID to remove') + remove_node_subparser.set_defaults(command='remove_node') + + def start_cluster(self, server_type, node_id, placement): + base_cmd = ['--net', YB_NETWORK_NAME, '--detach'] + server_cmds = [YB_DOCKER_IMAGE] + server_cmds.append(os.path.join(self.working_dir, 'bin', server_type)) + + exposed_ports = [] + if server_type == 'yb-master': + container_name = self.YB_MASTER_FORMAT.format(node_id) + container_with_port = self.YB_MASTER_FORMAT_WITH_PORT.format(node_id) + server_cmds.extend([ + '--replication_factor={}'.format(self.args.rf), + '--master_addresses={}'.format(",".join(self.master_addrs)), + '--rpc_bind_addresses={}'.format(container_with_port)]) + # we expose master ui port for first node alone, exposing other + # nodes to host would create a port conflict. + if self.args.master_flags is not None: + master_parsed_flag_args = self.parse_flag_args(self.args.master_flags) + server_cmds.extend(self.get_parsed_flag_args(master_parsed_flag_args)) + if node_id == 1: + exposed_ports.append(self.YB_MASTER_UI_PORT) + + elif server_type == 'yb-tserver': + container_name = self.YB_TSERVER_FORMAT.format(node_id) + container_with_port = self.YB_TSERVER_FORMAT_WITH_PORT.format(node_id) + server_cmds.extend([ + '--tserver_master_addrs={}'.format(",".join(self.master_addrs)), + '--rpc_bind_addresses={}'.format(container_with_port), + '--memory_limit_hard_bytes={}'.format(self.MEM_LIMIT_BYTES)]) + + server_cmds.extend([ + '--use_cassandra_authentication={}'.format( + str(self.args.use_cassandra_authentication).lower())]) + + if self.args.tserver_flags is not None: + tserver_parsed_flag_args = self.parse_flag_args(self.args.tserver_flags) + server_cmds.extend(self.get_parsed_flag_args(tserver_parsed_flag_args)) + # we expose tserver ui, cassandra and redis port for first node alone, + # exposing other nodes to host would create a port conflict. + if not self.args.disable_ysql: + server_cmds.extend([ + "--start_pgsql_proxy", + "--pgsql_proxy_bind_address", + self.YB_POSTGRES_FORMAT_WITH_PORT.format(node_id)]) + if node_id == 1: + exposed_ports.append(self.YB_TSERVER_UI_PORT) + exposed_ports.extend(self.CLIENT_PORTS) + if not self.args.disable_ysql: + exposed_ports.append(self.POSTGRES_PORT) + # Shared flags. + server_cmds.extend(['--fs_data_dirs={}'.format(",".join(self.volumes))]) + server_cmds.extend(['--yb_num_shards_per_tserver={}'.format( + self.args.num_shards_per_tserver)]) + server_cmds.extend(['--v={}'.format(self.args.v)]) + if bool(os.getenv('YB_DISABLE_CALLHOME')): + server_cmds.extend(['--callhome_enabled=false']) + + if placement is not None: + server_cmds.extend(self.get_placement_flags(placement)) + docker_cmd = ['docker', 'run', '--name', container_name, '--hostname', container_name, + '--privileged'] + for port in exposed_ports: + docker_cmd.extend(['-p', '{}:{}'.format(port, port)]) + docker_cmd.extend(base_cmd) + docker_cmd.extend(server_cmds) + print (' '.join(docker_cmd)) + print ("Adding node {}".format(container_name)) + subprocess.check_output(docker_cmd) + + def create_clusters(self): + if self.container_count > 0: + sys.exit("{} YugaByte nodes found. Please use --destroy if you want to destroy " + "existing cluster.".format(self.container_count)) + + placement_list = self.get_placement_list() + + if len(placement_list) > self.args.rf: + raise RuntimeError("Number of placement info fields is larger than" + "the replication factor and hence the number of servers.") + + for server_type in ['yb-master', 'yb-tserver']: + for node_id in range(1, self.args.rf + 1): + if len(placement_list) > 0: + self.start_cluster(server_type, + node_id, placement_list[(node_id - 1) % len(placement_list)]) + else: + self.start_cluster(server_type, node_id, None) + + self.modify_placement_info() + + def cluster_status(self): + if len(self.containers) == 0: + sys.exit("YugaByte containers are not running") + + print ("{:<15}{:<10} {:<10} {:<20} {:<25} {:<15} {:<20}".format( + 'ID', 'PID', 'Type', 'Node', 'URL', 'Status', 'Started At')) + + for info_hash in self.containers: + cid, name, ip, status, pid, started_at = info_hash.split("|") + node_types = filter(None, re.split('.*-(.*)-.*', name)) + node_type_first = node_types[0] if isinstance(node_types, list) else next(node_types) + ui_port = self.YB_MASTER_UI_PORT if \ + node_type_first == "master" else self.YB_TSERVER_UI_PORT + node_ui_host = "http://{}:{}".format(ip, ui_port) if status == 'running' else None + state = 'Running' if 'running' in status else 'Stopped' + print ("{:<15}{:<10} {:<10} {:<20} {:<25} {:<15} {:<20}".format( + cid[:12], pid, node_type_first, name, node_ui_host, state, started_at)) + + def destroy_cluster(self): + if len(self.containers) == 0: + sys.exit("No YugaByte nodes to destroy") + + for info_hash in self.containers: + id, name = info_hash.split("|")[:2] + print ("Destroying {}".format(name)) + subprocess.check_output(['docker', 'rm', id, '--force']) + self.destroy_yb_network() + + def modify_placement_info(self): + if self.args.placement_info is None: + return + + docker_cmd = ['docker', 'exec', '-it', + self.YB_MASTER_FORMAT.format(1), + '{}/bin/yb-admin'.format(self.working_dir), + '--master_addresses', ",".join(self.master_addrs), + 'modify_placement_info', self.args.placement_info, str(self.args.rf)] + start_time = time.time() + while (time.time() - start_time <= self.args.timeout_in_sec): + try: + subprocess.check_call(docker_cmd) + print ("Successfully modified placement info.") + return + except subprocess.CalledProcessError: + time.sleep(1) + + sys.exit("Could not modify placement info for the cluster.") + + def wait_for_cluster(self): + start_time = time.time() + while (time.time() - start_time <= self.args.timeout_in_sec): + try: + docker_cmd = ['docker', 'exec', '-it', self.YB_MASTER_FORMAT.format(1), + '{}/bin/yb-admin'.format(self.working_dir), '--master_addresses', + ",".join(self.master_addrs), 'list_all_tablet_servers'] + result = get_subprocess_result_as_str(docker_cmd) + + if result.count("yb-tserver-") == self.args.rf: + docker_cmd = ['docker', 'exec', '-it', self.YB_MASTER_FORMAT.format(1), + '{}/bin/yb-admin'.format(self.working_dir), '--master_addresses', + ",".join(self.master_addrs), + '--yb_num_shards_per_tserver={}'.format( + self.args.num_shards_per_tserver), + 'setup_redis_table'] + result = get_subprocess_result_as_str(docker_cmd) + if "created." not in result: + continue + if not self.args.disable_ysql: + pg_bin_dir = os.path.join(self.working_dir, "postgres", 'bin') + # TODO(bogdan): Since docker is one time create and cannot re-run this + # setup step, we do not need the randomness that we put into yb-ctl. + tmp_data_dir = "/tmp/yb_pg_initdb_tmp_data_dir" + init_db_envs = "YB_ENABLED_IN_POSTGRES=1" + init_db_envs += " FLAGS_pggate_master_addresses={}".format( + ",".join(self.master_addrs)) + init_db_cmd = "{} {}/initdb -U postgres -D {}".format( + init_db_envs, pg_bin_dir, tmp_data_dir) + # Setup the list based cmdline. + print("Running initdb to initialize PostgreSQL metadata " + "in the YugaByte cluster. This may take up to a minute.") + docker_cmd = ['docker', 'exec', '-it', self.YB_TSERVER_FORMAT.format(1)] + docker_cmd.extend(["bash", "-c", init_db_cmd]) + result = get_subprocess_result_as_str(docker_cmd) + return + except subprocess.CalledProcessError: + pass + time.sleep(1) + + sys.exit("Timed out waiting for YugaByte clusters to come up. " + "Please use --timeout_in_sec to increase timeout. " + "Please also ensure that you have at least YugaByte version 1.1.2.0-b10.") + + def add_node(self): + new_node_id = self.num_tservers + 1 + placement_list = self.get_placement_list() + if len(placement_list) > 1: + raise RuntimeError("Can only specify one placement info for add node.") + elif len(placement_list) == 1: + self.start_cluster("yb-tserver", new_node_id, placement_list[0]) + else: + self.start_cluster("yb-tserver", new_node_id, None) + + def remove_node(self): + node_name = self.YB_TSERVER_FORMAT.format(self.args.node) + node = [node_info for node_info in self.tserver_nodes + if node_name in node_info] + if len(node) == 0: + sys.exit("Provided node {} not found".format(node_name)) + + node_id = node[0].split("|")[0] + print ("Stopping node : {}".format(node_name)) + subprocess.check_output(['docker', 'stop', node_id]) + + def start_cluster_nodes(self): + for node_num in range(self.num_master): + self.start_node(node_num + 1, True) + for node_num in range(self.num_tservers): + self.start_node(node_num + 1, False) + + def stop_cluster_nodes(self): + for node_num in range(self.num_master): + self.stop_node(node_num + 1, True) + for node_num in range(self.num_tservers): + self.stop_node(node_num + 1, False) + + def start_node(self, node, is_master): + node_name = (self.YB_MASTER_FORMAT.format(node) + if is_master else self.YB_TSERVER_FORMAT.format(node)) + nodes = self.master_nodes if is_master else self.tserver_nodes + node = [node_info for node_info in nodes if node_name in node_info] + if len(node) == 0: + sys.exit("Provided node {} not found".format(node_name)) + + # Node info is the following format: + # '63a847999b6694629648a821bce6818b6b412d9bbcc232e2c38673f2fdb74b75|yb-tserver-n2| + # 172.25.0.4|running|12758|2019-03-21T21:59:45.6391072Z' + node_id = node[0].split("|")[0] + print ("Starting node : {}".format(node_name)) + subprocess.check_output(['docker', 'start', node_id]) + + def stop_node(self, node, is_master): + node_name = (self.YB_MASTER_FORMAT.format(node) + if is_master else self.YB_TSERVER_FORMAT.format(node)) + nodes = self.master_nodes if is_master else self.tserver_nodes + node = [node_info for node_info in nodes if node_name in node_info] + if len(node) == 0: + sys.exit("Provided node {} not found".format(node_name)) + + node_id = node[0].split("|")[0] + print ("Stopping node : {}".format(node_name)) + subprocess.check_output(['docker', 'stop', node_id]) + + def run(self): + self.args = self.parser.parse_args() + self.fetch_containers() + + if self.args.command == 'create': + self.create_clusters() + self.wait_for_cluster() + self.fetch_containers() + self.cluster_status() + if not os.getenv("YB_DISABLE_CALLHOME"): + print ("Congratulations on installing YugaByte DB Community Edition. " + + "We'd like to welcome you to the community with a free t-shirt " + + "and pack of stickers! " + + "Please claim your reward here: https://www.yugabyte.com/community-rewards/") + elif self.args.command == 'status': + self.cluster_status() + elif self.args.command == 'destroy': + self.destroy_cluster() + elif self.args.command == 'add_node': + self.add_node() + elif self.args.command == 'remove_node': + self.remove_node() + elif self.args.command == 'start': + self.start_cluster_nodes() + elif self.args.command == 'start_node': + self.start_node(self.args.node, self.args.master) + elif self.args.command == 'stop': + self.stop_cluster_nodes() + elif self.args.command == 'stop_node': + self.stop_node(self.args.node, self.args.master) + +if __name__ == "__main__": + YBDockerControl().run()