-
Notifications
You must be signed in to change notification settings - Fork 314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for multiple clusters in custom runners #488
Changes from 38 commits
b79a7a5
91d261b
6964a06
ed785be
751b61e
ef3ec49
2de6a98
23d037a
b93a624
bed9491
d6a9f00
3963a4b
2009f8a
b7658fe
d29c5c2
67e67c3
ded44a6
9f60438
ab8af4a
5979419
88fe218
393a320
703a9c9
185a7c2
0bca981
7112b1d
286ad76
8605d10
d0d5071
bdc3f0e
da1a168
9dde567
f9fc471
484900b
916b278
89ca81e
db17a6b
28af9c0
ba5e8b3
284e3fa
0a17876
e04df54
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -423,7 +423,6 @@ By default, Rally will run its load driver on the same machine where you start t | |
|
||
In the example, above Rally will generate load from the hosts ``10.17.20.5`` and ``10.17.20.6``. For this to work, you need to start a Rally daemon on these machines, see :ref:`distributing the load test driver <recipe_distributed_load_driver>` for a complete example. | ||
|
||
|
||
``target-hosts`` | ||
~~~~~~~~~~~~~~~~ | ||
|
||
|
@@ -435,7 +434,9 @@ If you run the ``benchmark-only`` :doc:`pipeline </pipelines>` or you want Rally | |
|
||
esrally --pipeline=benchmark-only --target-hosts=10.17.0.5:9200,10.17.0.6:9200 | ||
|
||
This will run the benchmark against the hosts 10.17.0.5 and 10.17.0.6 on port 9200. See ``client-options`` if you use X-Pack Security and need to authenticate or Rally should use https. | ||
** This will run the benchmark against the hosts 10.17.0.5 and 10.17.0.6 on port 9200. See ``client-options`` if you use X-Pack Security and need to authenticate or Rally should use https. | ||
|
||
You can also target multiple clusters with ``--target-hosts`` for specific use cases. This is described in the Advanced topics section. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe add an in-page link? |
||
|
||
``quiet`` | ||
~~~~~~~~~ | ||
|
@@ -509,3 +510,67 @@ When you run ``esrally list races``, this will show up again:: | |
20160518T112341Z pmc append-no-conflicts defaults disk:SSD,data_node_count:4 | ||
|
||
This will help you recognize a specific race when running ``esrally compare``. | ||
|
||
.. _command_line_reference_advanced_topics: | ||
|
||
Advanced topics | ||
--------------- | ||
|
||
``target-hosts`` | ||
~~~~~~~~~~~~~~~~ | ||
|
||
Rally can also create client connections for multiple Elasticsearch clusters. | ||
This is only useful if you want to create :ref:`custom runners <adding_tracks_custom_runners>` that execute operations against remote clusters, for example for Cross Cluster Search or Cross Cluster Replication. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. replace "against remote clusters" with "against multiple clusters"? Also maybe add a link for cross cluster search? I also think that both names need to be lower case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
To define the host:port pairs for additional clusters with ``target-hosts`` you can specify either a JSON filename (ending in ``.json``) or an inline JSON string. The JSON object should be a collection of name:value pairs. ``name`` is string for the cluster name and there **must be** one ``default``. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. replace "for additional clusters" with "for multiple clusters"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
Examples: | ||
|
||
* json file: ``--target-hosts="target_hosts1.json"``:: | ||
|
||
{ "default": ["127.0.0.1:9200","10.127.0.3:19200"] } | ||
|
||
* json file defining two clusters: ``--target-hosts="target_hosts2.json"``:: | ||
|
||
{ | ||
"default": [ | ||
{"host": "127.0.0.1", "port": 9200}, | ||
{"host": "127.0.0.1", "port": 19200} | ||
], | ||
"remote":[ | ||
{"host": "10.127.0.3", "port": 9200}, | ||
{"host": "10.127.0.8", "port": 9201} | ||
] | ||
} | ||
|
||
* json inline string defining two clusters:: | ||
|
||
--target-hosts="{\"default\":[\"127.0.0.1:9200\"],\"remote\":[\"127.0.0.1:19200\",\"127.0.0.1:19201\"]}" | ||
|
||
.. NOTE:: | ||
**All** internal :ref:`track operations <track_operations>` will use the connection to the ``default`` cluster. However, you can utilize the client connections to the additional clusters in your :ref:`custom runners <adding_tracks_custom_runners>`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: It took me a while to understand the term "internal track operations". Maybe use "built-in operations" instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
``client-options`` | ||
~~~~~~~~~~~~~~~~~~ | ||
|
||
``client-options`` can optionally specify options for the Elasticsearch clients when multiple clusters have been defined with ``target-hosts``. If omitted, the default is ``timeout:60`` for all cluster connections. | ||
|
||
The format is similar to ``target-hosts``, supporting both filenames ending in ``.json`` or inline JSON, however, the parameters are a collection of name:value pairs, as opposed to arrays. | ||
|
||
Examples, assuming that two clusters have been specified with ``--target-hosts``: | ||
|
||
* json file: ``--client-options="client_options1.json"``:: | ||
|
||
{ | ||
"default": | ||
{"timeout": 60}, | ||
"remote": | ||
{"use_ssl":true,"verify_certs": false,"ca_certs":"/path/to/cacert.pem"} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Maybe reformat so it's easier to read? {
"default": {
"timeout": 60
},
"remote": {
"use_ssl": true,
"verify_certs": false,
"ca_certs": "/path/to/cacert.pem"
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
* json inline string defining two clusters:: | ||
|
||
--client-options="{\"default\":{\"timeout\": 60}, \"remote\": {\"use_ssl\":true,\"verify_certs\":false,\"ca_certs\":\"/path/to/cacert.pem\"}}" | ||
|
||
.. WARNING:: | ||
If you use ``client-options`` you must specify options for **every** cluster name defined with ``target-hosts``. An error message will be displayed if there is a mismatch. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Use active voice? "An error message will be displayed if there is a mismatch." -> (something along the lines of): "Rally raises an error on mismatch."? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After checking the code I am also not sure where it would actually raise an error on mismatch (I spotted this because I think such a test is missing). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -550,7 +550,7 @@ def merge(self, *args): | |
|
||
class LoadGenerator(actor.RallyActor): | ||
""" | ||
The actual driver that applies load against the cluster. | ||
The actual driver that applies load against the cluster(s). | ||
|
||
It will also regularly send measurements to the master node so it can consolidate them. | ||
""" | ||
|
@@ -582,12 +582,18 @@ def __init__(self): | |
|
||
@actor.no_retry("load generator") | ||
def receiveMsg_StartLoadGenerator(self, msg, sender): | ||
def EsClients(all_hosts, all_client_options): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in 0a17876 |
||
es = {} | ||
for cluster_name, cluster_hosts in all_hosts.items(): | ||
es[cluster_name] = client.EsClientFactory(cluster_hosts, all_client_options[cluster_name]).create() | ||
return es | ||
|
||
logger.info("LoadGenerator[%d] is about to start." % msg.client_id) | ||
self.master = sender | ||
self.client_id = msg.client_id | ||
self.config = load_local_config(msg.config) | ||
self.abort_on_error = self.config.opts("driver", "on.error") == "abort" | ||
self.es = client.EsClientFactory(self.config.opts("client", "hosts"), self.config.opts("client", "options")).create() | ||
self.es = EsClients(self.config.opts("client", "hosts").all_hosts, self.config.opts("client", "options").all_client_options) | ||
self.track = msg.track | ||
track.set_absolute_data_path(self.config, self.track) | ||
self.tasks = msg.tasks | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,16 +20,23 @@ def runner_for(operation_type): | |
|
||
|
||
def register_runner(operation_type, runner): | ||
if getattr(runner, "multi_cluster", False) == True: | ||
if "__enter__" in dir(runner) and "__exit__" in dir(runner): | ||
logger.info("Registering runner object [%s] for [%s]." % (str(runner), str(operation_type))) | ||
__RUNNERS[operation_type] = MultiClusterDelegatingRunner(runner, str(runner), context_manager_enabled=True) | ||
else: | ||
logger.info("Registering context-manager capable runner object [%s] for [%s]." % (str(runner), str(operation_type))) | ||
__RUNNERS[operation_type] = MultiClusterDelegatingRunner(runner, str(runner)) | ||
# we'd rather use callable() but this will erroneously also classify a class as callable... | ||
if isinstance(runner, types.FunctionType): | ||
elif isinstance(runner, types.FunctionType): | ||
logger.info("Registering runner function [%s] for [%s]." % (str(runner), str(operation_type))) | ||
__RUNNERS[operation_type] = DelegatingRunner(runner, runner.__name__) | ||
__RUNNERS[operation_type] = SingleClusterDelegatingRunner(runner, runner.__name__) | ||
elif "__enter__" in dir(runner) and "__exit__" in dir(runner): | ||
logger.info("Registering context-manager capable runner object [%s] for [%s]." % (str(runner), str(operation_type))) | ||
__RUNNERS[operation_type] = runner | ||
__RUNNERS[operation_type] = SingleClusterDelegatingRunner(runner, str(runner), context_manager_enabled=True) | ||
else: | ||
logger.info("Registering runner object [%s] for [%s]." % (str(runner), str(operation_type))) | ||
__RUNNERS[operation_type] = DelegatingRunner(runner, str(runner)) | ||
__RUNNERS[operation_type] = SingleClusterDelegatingRunner(runner, str(runner)) | ||
|
||
|
||
# Only intended for unit-testing! | ||
|
@@ -61,16 +68,63 @@ def __exit__(self, exc_type, exc_val, exc_tb): | |
return False | ||
|
||
|
||
class DelegatingRunner(Runner): | ||
def __init__(self, runnable, name): | ||
class SingleClusterDelegatingRunner(Runner): | ||
def __init__(self, runnable, name, context_manager_enabled=False): | ||
self.runnable = runnable | ||
self.name = name | ||
self.context_manager_enabled = context_manager_enabled | ||
|
||
def __call__(self, *args): | ||
# Single cluster mode: es parameter passed in runner is a client object for the "default" cluster | ||
es = args[0] | ||
return self.runnable(es['default'], *args[1:]) | ||
|
||
def __repr__(self, *args, **kwargs): | ||
if self.context_manager_enabled: | ||
return "user-defined context-manager enabled runner for [%s]" % self.name | ||
else: | ||
return "user-defined runner for [%s]" % self.name | ||
|
||
def __enter__(self): | ||
if self.context_manager_enabled: | ||
self.runnable.__enter__() | ||
return self | ||
else: | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
if self.context_manager_enabled: | ||
return self.runnable.__exit__(exc_type, exc_val, exc_tb) | ||
else: | ||
return False | ||
|
||
|
||
class MultiClusterDelegatingRunner(Runner): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right; I thought we don't really need it as it is a special case after all. Then again, if it doesn't support the context manager protocol, I should clarify this in the docs, which sounds to me like more effort than adding the support :) |
||
def __init__(self, runnable, name, context_manager_enabled=False): | ||
self.runnable = runnable | ||
self.name = name | ||
self.context_manager_enabled = context_manager_enabled | ||
|
||
def __call__(self, *args): | ||
# Multi cluster mode: pass the entire es dict and let runner code handle connections to different clusters | ||
return self.runnable(*args) | ||
|
||
def __repr__(self, *args, **kwargs): | ||
return "user-defined runner for [%s]" % self.name | ||
if self.context_manager_enabled: | ||
return "user-defined multi-cluster context-manager enabled runner for [%s]" % self.name | ||
else: | ||
return "user-defined multi-cluster enabled runner for [%s]" % self.name | ||
|
||
def __enter__(self): | ||
if self.context_manager_enabled: | ||
self.runnable.__enter__() | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
if self.context_manager_enabled: | ||
return self.runnable.__exit__(exc_type, exc_val, exc_tb) | ||
else: | ||
return False | ||
|
||
|
||
def mandatory(params, key, op): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,23 +62,31 @@ def start(self): | |
""" | ||
enabled_devices = self.cfg.opts("mechanic", "telemetry.devices") | ||
telemetry_params = self.cfg.opts("mechanic", "telemetry.params") | ||
hosts = self.cfg.opts("client", "hosts") | ||
client_options = self.cfg.opts("client", "options") | ||
es = self.client_factory(hosts, client_options).create() | ||
all_hosts = self.cfg.opts("client", "hosts").all_hosts | ||
default_hosts = self.cfg.opts("client", "hosts").default | ||
|
||
es = {} | ||
for cluster_name, cluster_hosts in all_hosts.items(): | ||
all_client_options = self.cfg.opts("client", "options").all_client_options | ||
cluster_client_options = all_client_options[cluster_name] | ||
es[cluster_name] = self.client_factory(cluster_hosts, cluster_client_options).create() | ||
|
||
es_default = es["default"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am unsure about using the "magic name" |
||
|
||
t = telemetry.Telemetry(enabled_devices, devices=[ | ||
telemetry.NodeStats(telemetry_params, es, self.metrics_store), | ||
telemetry.ClusterMetaDataInfo(es), | ||
telemetry.ClusterEnvironmentInfo(es, self.metrics_store), | ||
telemetry.GcTimesSummary(es, self.metrics_store), | ||
telemetry.IndexStats(es, self.metrics_store), | ||
telemetry.MlBucketProcessingTime(es, self.metrics_store) | ||
telemetry.NodeStats(telemetry_params, es_default, self.metrics_store), | ||
telemetry.ClusterMetaDataInfo(es_default), | ||
telemetry.ClusterEnvironmentInfo(es_default, self.metrics_store), | ||
telemetry.GcTimesSummary(es_default, self.metrics_store), | ||
telemetry.IndexStats(es_default, self.metrics_store), | ||
telemetry.MlBucketProcessingTime(es_default, self.metrics_store) | ||
]) | ||
|
||
# The list of nodes will be populated by ClusterMetaDataInfo, so no need to do it here | ||
c = cluster.Cluster(hosts, [], t) | ||
c = cluster.Cluster(default_hosts, [], t) | ||
|
||
logger.info("All cluster nodes have successfully started. Checking if REST API is available.") | ||
if wait_for_rest_layer(es, max_attempts=40): | ||
if wait_for_rest_layer(es_default, max_attempts=40): | ||
logger.info("REST API is available. Attaching telemetry devices to cluster.") | ||
t.attach_to_cluster(c) | ||
logger.info("Telemetry devices are now attached to the cluster.") | ||
|
@@ -88,7 +96,7 @@ def start(self): | |
self.stop(c) | ||
raise exceptions.LaunchError("Elasticsearch REST API layer is not available. Forcefully terminated cluster.") | ||
if self.on_post_launch: | ||
self.on_post_launch(es) | ||
self.on_post_launch(es["default"]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't you use |
||
return c | ||
|
||
def stop(self, c): | ||
|
@@ -184,8 +192,8 @@ def __init__(self, cfg, metrics_store, client_factory_class=client.EsClientFacto | |
self.client_factory = client_factory_class | ||
|
||
def start(self, node_configurations=None): | ||
hosts = self.cfg.opts("client", "hosts") | ||
client_options = self.cfg.opts("client", "options") | ||
hosts = self.cfg.opts("client", "hosts").default | ||
client_options = self.cfg.opts("client", "options").default | ||
es = self.client_factory(hosts, client_options).create() | ||
|
||
# cannot enable custom telemetry devices here | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for adding
**
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo. Thanks for catching.