Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage Elasticsearch nodes with dedicated subcommands #830

Merged
merged 22 commits into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion esrally/mechanic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@

# expose only the minimum API
from .mechanic import StartEngine, EngineStarted, StopEngine, EngineStopped, ResetRelativeTime, MechanicActor, \
cluster_distribution_version, download
cluster_distribution_version, download, install, start, stop
12 changes: 3 additions & 9 deletions esrally/mechanic/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,21 @@ class Node:
Represents an Elasticsearch cluster node.
"""

def __init__(self, pid, host_name, node_name, telemetry):
def __init__(self, pid, binary_path, host_name, node_name, telemetry):
"""
Creates a new node.

:param pid: PID for this node.
:param binary_path: The local path to the binaries for this node.
:param host_name: The name of the host where this node is running.
:param node_name: The name of this node.
:param telemetry: The attached telemetry.
"""
self.pid = pid
self.binary_path = binary_path
self.host_name = host_name
self.node_name = node_name
self.ip = None
self.telemetry = telemetry
# populated by telemetry
self.os = {}
self.jvm = {}
self.cpu = {}
self.memory = {}
self.fs = []
self.plugins = []

def __repr__(self):
return self.node_name
11 changes: 5 additions & 6 deletions esrally/mechanic/java_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@
from esrally.utils import jvm


def java_home(car, cfg):
def determine_runtime_jdks(car):
def java_home(car_runtime_jdks, cfg):
def determine_runtime_jdks():
override_runtime_jdk = cfg.opts("mechanic", "runtime.jdk")
if override_runtime_jdk:
return [override_runtime_jdk]
else:
runtime_jdks = car.mandatory_var("runtime.jdk")
try:
return [int(v) for v in runtime_jdks.split(",")]
return [int(v) for v in car_runtime_jdks.split(",")]
except ValueError:
raise exceptions.SystemSetupError(
"Car config key \"runtime.jdk\" is invalid: \"{}\" (must be int)".format(runtime_jdks))
"Car config key \"runtime.jdk\" is invalid: \"{}\" (must be int)".format(car_runtime_jdks))

logger = logging.getLogger(__name__)

runtime_jdk_versions = determine_runtime_jdks(car)
runtime_jdk_versions = determine_runtime_jdks()
logger.info("Allowed JDK versions are %s.", runtime_jdk_versions)
major, java_home = jvm.resolve_path(runtime_jdk_versions)
logger.info("Detected JDK with major version [%s] in [%s].", major, java_home)
Expand Down
181 changes: 83 additions & 98 deletions esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,22 @@
# under the License.
import logging
import os
import shlex
import signal
import subprocess
from time import monotonic as _time

import psutil

from esrally import time, exceptions, telemetry
from esrally.mechanic import cluster, java_resolver
from esrally.utils import process


def _get_container_id(compose_config):
compose_ps_cmd = _get_docker_compose_cmd(compose_config, "ps -q")

output = subprocess.check_output(args=shlex.split(compose_ps_cmd))
return output.decode("utf-8").rstrip()


def _wait_for_healthy_running_container(container_id, timeout=60):
cmd = 'docker ps -a --filter "id={}" --filter "status=running" --filter "health=healthy" -q'.format(container_id)
endtime = _time() + timeout
while _time() < endtime:
output = subprocess.check_output(shlex.split(cmd))
containers = output.decode("utf-8").rstrip()
if len(containers) > 0:
return
time.sleep(0.5)
msg = "No healthy running container after {} seconds!".format(timeout)
logging.error(msg)
raise exceptions.LaunchError(msg)


def _get_docker_compose_cmd(compose_config, cmd):
return "docker-compose -f {} {}".format(compose_config, cmd)
from esrally.utils import io, process


class DockerLauncher:
# May download a Docker image and that can take some time
PROCESS_WAIT_TIMEOUT_SECONDS = 10 * 60

def __init__(self, cfg, metrics_store):
def __init__(self, cfg, clock=time.Clock):
self.cfg = cfg
self.metrics_store = metrics_store
self.binary_paths = {}
self.keep_running = self.cfg.opts("mechanic", "keep.running")
self.clock = clock
self.logger = logging.getLogger(__name__)

def start(self, node_configurations):
Expand All @@ -70,44 +40,67 @@ def start(self, node_configurations):
node_name = node_configuration.node_name
host_name = node_configuration.ip
binary_path = node_configuration.binary_path
self.binary_paths[node_name] = binary_path
self.logger.info("Starting node [%s] in Docker.", node_name)
self._start_process(binary_path)
node_telemetry = [
# Don't attach any telemetry devices for now but keep the infrastructure in place
]
t = telemetry.Telemetry(devices=node_telemetry)
telemetry.add_metadata_for_node(self.metrics_store, node_name, host_name)
node = cluster.Node(0, host_name, node_name, t)
node = cluster.Node(0, binary_path, host_name, node_name, t)
t.attach_to_node(node)
nodes.append(node)
return nodes

def _start_process(self, binary_path):
compose_cmd = _get_docker_compose_cmd(binary_path, "up -d")
compose_cmd = self._docker_compose(binary_path, "up -d")

ret = process.run_subprocess_with_logging(compose_cmd)
if ret != 0:
msg = "Docker daemon startup failed with exit code[{}]".format(ret)
msg = "Docker daemon startup failed with exit code [{}]".format(ret)
logging.error(msg)
raise exceptions.LaunchError(msg)

container_id = _get_container_id(binary_path)
_wait_for_healthy_running_container(container_id)
container_id = self._get_container_id(binary_path)
self._wait_for_healthy_running_container(container_id, DockerLauncher.PROCESS_WAIT_TIMEOUT_SECONDS)

def stop(self, nodes):
if self.keep_running:
self.logger.info("Keeping Docker container running.")
else:
self.logger.info("Stopping Docker container")
for node in nodes:
node.telemetry.detach_from_node(node, running=True)
process.run_subprocess_with_logging(_get_docker_compose_cmd(self.binary_paths[node.node_name], "down"))
node.telemetry.detach_from_node(node, running=False)
def _docker_compose(self, compose_config, cmd):
return "docker-compose -f {} {}".format(os.path.join(compose_config, "docker-compose.yml"), cmd)

def _get_container_id(self, compose_config):
compose_ps_cmd = self._docker_compose(compose_config, "ps -q")
return process.run_subprocess_with_output(compose_ps_cmd)[0]

def wait_for_pidfile(pidfilename, timeout=60):
endtime = _time() + timeout
while _time() < endtime:
def _wait_for_healthy_running_container(self, container_id, timeout):
cmd = 'docker ps -a --filter "id={}" --filter "status=running" --filter "health=healthy" -q'.format(container_id)
stop_watch = self.clock.stop_watch()
stop_watch.start()
while stop_watch.split_time() < timeout:
containers = process.run_subprocess_with_output(cmd)
if len(containers) > 0:
return
time.sleep(0.5)
msg = "No healthy running container after {} seconds!".format(timeout)
logging.error(msg)
raise exceptions.LaunchError(msg)

def stop(self, nodes, metrics_store):
self.logger.info("Shutting down [%d] nodes running in Docker on this host.", len(nodes))
for node in nodes:
# readd meta-data - we already did this on startup but in case dedicated subcommands are used for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/readd/read

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really meant this as: "add the meta-data again". But this was only valid in an earlier commit so I'll remove the comment entirely.

# handling the node lifecycle, we are handed a different metrics store instance and thus need to add
# metadata again.
self.logger.info("Stopping node [%s].", node.node_name)
telemetry.add_metadata_for_node(metrics_store, node.node_name, node.host_name)
node.telemetry.detach_from_node(node, running=True)
process.run_subprocess_with_logging(self._docker_compose(node.binary_path, "down"))
node.telemetry.detach_from_node(node, running=False)
node.telemetry.store_system_metrics(node, metrics_store)


def wait_for_pidfile(pidfilename, timeout=60, clock=time.Clock):
stop_watch = clock.stop_watch()
stop_watch.start()
while stop_watch.split_time() < timeout:
try:
with open(pidfilename, "rb") as f:
return int(f.read())
Expand All @@ -125,37 +118,25 @@ class ProcessLauncher:
"""
PROCESS_WAIT_TIMEOUT_SECONDS = 90.0

def __init__(self, cfg, metrics_store, races_root_dir, clock=time.Clock):
def __init__(self, cfg, clock=time.Clock):
self.cfg = cfg
self.metrics_store = metrics_store
self._clock = clock
self.races_root_dir = races_root_dir
self.keep_running = self.cfg.opts("mechanic", "keep.running")
self.logger = logging.getLogger(__name__)

def start(self, node_configurations):
# we're very specific which nodes we kill as there is potentially also an Elasticsearch based metrics store
# running on this machine
# The only specific trait of a Rally-related process is that is started "somewhere" in the races root directory.
#
# We also do this only once per host otherwise we would kill instances that we've just launched.
process.kill_running_es_instances(self.races_root_dir)
node_count_on_host = len(node_configurations)
return [self._start_node(node_configuration, node_count_on_host) for node_configuration in node_configurations]

def _start_node(self, node_configuration, node_count_on_host):
host_name = node_configuration.ip
node_name = node_configuration.node_name
car = node_configuration.car
binary_path = node_configuration.binary_path
data_paths = node_configuration.data_paths
node_telemetry_dir = os.path.join(node_configuration.node_root_path, "telemetry")

java_major_version, java_home = java_resolver.java_home(car, self.cfg)
java_major_version, java_home = java_resolver.java_home(node_configuration.car_runtime_jdks, self.cfg)

telemetry.add_metadata_for_node(self.metrics_store, node_name, host_name)

self.logger.info("Starting node [%s] based on car [%s].", node_name, car)
self.logger.info("Starting node [%s].", node_name)

enabled_devices = self.cfg.opts("telemetry", "devices")
telemetry_params = self.cfg.opts("telemetry", "params")
Expand All @@ -164,33 +145,35 @@ def _start_node(self, node_configuration, node_count_on_host):
telemetry.JitCompiler(node_telemetry_dir),
telemetry.Gc(node_telemetry_dir, java_major_version),
telemetry.Heapdump(node_telemetry_dir),
telemetry.DiskIo(self.metrics_store, node_count_on_host, node_telemetry_dir, node_name),
telemetry.IndexSize(data_paths, self.metrics_store),
telemetry.StartupTime(self.metrics_store),
telemetry.DiskIo(node_count_on_host),
telemetry.IndexSize(data_paths),
telemetry.StartupTime(),
]

t = telemetry.Telemetry(enabled_devices, devices=node_telemetry)
env = self._prepare_env(car, node_name, java_home, t)
# TODO #822: Remove reference to car's environment
env = self._prepare_env(node_configuration.car_env, node_name, java_home, t)
t.on_pre_node_start(node_name)
node_pid = self._start_process(binary_path, env)
node = cluster.Node(node_pid, host_name, node_name, t)
self.logger.info("Successfully started node [%s] with PID [%s].", node_name, node_pid)
node = cluster.Node(node_pid, binary_path, host_name, node_name, t)

self.logger.info("Attaching telemetry devices to node [%s].", node_name)
t.attach_to_node(node)

return node

def _prepare_env(self, car, node_name, java_home, t):
def _prepare_env(self, car_env, node_name, java_home, t):
env = {}
env.update(os.environ)
env.update(car.env)
env.update(car_env)
self._set_env(env, "PATH", os.path.join(java_home, "bin"), separator=os.pathsep, prepend=True)
# Don't merge here!
env["JAVA_HOME"] = java_home
env["ES_JAVA_OPTS"] = "-XX:+ExitOnOutOfMemoryError"

# we just blindly trust telemetry here...
for v in t.instrument_candidate_java_opts(car, node_name):
for v in t.instrument_candidate_java_opts():
self._set_env(env, "ES_JAVA_OPTS", v)

self.logger.debug("env for [%s]: %s", node_name, str(env))
Expand All @@ -207,42 +190,44 @@ def _set_env(self, env, k, v, separator=' ', prepend=False):

@staticmethod
def _start_process(binary_path, env):
if os.geteuid() == 0:
if os.name == "posix" and os.geteuid() == 0:
raise exceptions.LaunchError("Cannot launch Elasticsearch as root. Please run Rally as a non-root user.")
os.chdir(binary_path)
cmd = ["bin/elasticsearch"]
cmd = [io.escape_path(os.path.join(".", "bin", "elasticsearch"))]
cmd.extend(["-d", "-p", "pid"])
ret = process.run_subprocess_with_logging(command_line=" ".join(cmd), env=env)
if ret != 0:
msg = "Daemon startup failed with exit code[{}]".format(ret)
msg = "Daemon startup failed with exit code [{}]".format(ret)
logging.error(msg)
raise exceptions.LaunchError(msg)

return wait_for_pidfile("./pid")
return wait_for_pidfile(io.escape_path(os.path.join(".", "pid")))

def stop(self, nodes):
if self.keep_running:
self.logger.info("Keeping [%d] nodes on this host running.", len(nodes))
else:
self.logger.info("Shutting down [%d] nodes on this host.", len(nodes))
def stop(self, nodes, metrics_store):
self.logger.info("Shutting down [%d] nodes on this host.", len(nodes))
for node in nodes:
proc = psutil.Process(pid=node.pid)
node_name = node.node_name
# readd meta-data - we already did this on startup but in case dedicated subcommands are used for
dliappis marked this conversation as resolved.
Show resolved Hide resolved
# handling the node lifecycle, we are handed a different metrics store instance and thus need to add
# metadata again.
telemetry.add_metadata_for_node(metrics_store, node_name, node.host_name)

node.telemetry.detach_from_node(node, running=True)
if not self.keep_running:
stop_watch = self._clock.stop_watch()
stop_watch.start()
stop_watch = self._clock.stop_watch()
stop_watch.start()
try:
os.kill(proc.pid, signal.SIGTERM)
proc.wait(10.0)
except ProcessLookupError:
self.logger.warning("No process found with PID [%s] for node [%s]", proc.pid, node_name)
except psutil.TimeoutExpired:
self.logger.info("kill -KILL node [%s]", node_name)
try:
os.kill(proc.pid, signal.SIGTERM)
proc.wait(10.0)
# kill -9
proc.kill()
except ProcessLookupError:
self.logger.warning("No process found with PID [%s] for node [%s]", proc.pid, node_name)
except psutil.TimeoutExpired:
self.logger.info("kill -KILL node [%s]", node_name)
try:
# kill -9
proc.kill()
except ProcessLookupError:
self.logger.warning("No process found with PID [%s] for node [%s]", proc.pid, node_name)
node.telemetry.detach_from_node(node, running=False)
self.logger.info("Done shutdown node [%s] in [%.1f] s.", node_name, stop_watch.split_time())
node.telemetry.detach_from_node(node, running=False)
node.telemetry.store_system_metrics(node, metrics_store)
self.logger.info("Done shutdown node [%s] in [%.1f] s.", node_name, stop_watch.split_time())
dliappis marked this conversation as resolved.
Show resolved Hide resolved
Loading