Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple clusters in custom runners #488

Merged
merged 42 commits into from
May 3, 2018
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b79a7a5
WiP support for multiple clusters in --target-hosts
dliappis Apr 24, 2018
91d261b
Initial working example
dliappis Apr 25, 2018
6964a06
Support client-options multi params
dliappis Apr 26, 2018
ed785be
Allow parsing of client-options and target-hosts from json files too
dliappis Apr 27, 2018
751b61e
Docstring and debugging print cleanups
dliappis Apr 27, 2018
ef3ec49
Bug fix
dliappis Apr 27, 2018
2de6a98
Rename things for better readability
dliappis Apr 27, 2018
23d037a
More cleanups and fix tests after renaming DelegatinRunner
dliappis Apr 27, 2018
b93a624
Allow no --client-options even with multi cluster target-hosts
dliappis Apr 28, 2018
bed9491
Fix wrapping of context-manager capable Runners
dliappis Apr 28, 2018
d6a9f00
Fix ExternalLauncherTests
dliappis Apr 28, 2018
3963a4b
Add test for context-manager preservation
dliappis Apr 28, 2018
2009f8a
Bug fixes for cluster aware launcher
dliappis Apr 28, 2018
b7658fe
Fix launcher tests after rebase with master
dliappis Apr 28, 2018
d29c5c2
Fix cluster telemetry stop
dliappis Apr 28, 2018
67e67c3
Bug fix when no target-host is used
dliappis Apr 28, 2018
ded44a6
Remove debug print message
dliappis Apr 28, 2018
9f60438
Check for the boolean value of multi_cluster in custom runner
dliappis Apr 30, 2018
ab8af4a
Revert concept of Clusters from launcher
dliappis Apr 30, 2018
5979419
Do not commit yet any CCR telemetry device
dliappis Apr 30, 2018
88fe218
WiP documentation for --target-hosts
dliappis Apr 30, 2018
393a320
Fix tests after ab8af4a1d565028ddfe724a27ea4bd47a9749b41
dliappis Apr 30, 2018
703a9c9
Remove debugging code
dliappis May 1, 2018
185a7c2
Correct test case for inline json cli parameter of target-hosts
dliappis May 1, 2018
0bca981
Add documentation for multi cluster support
dliappis May 1, 2018
7112b1d
Remove left over code for stopping clusters
dliappis May 1, 2018
286ad76
Remove unintentionally inserted newline
dliappis May 1, 2018
8605d10
Re-introduce newline, as found in current master
dliappis May 1, 2018
d0d5071
Remove debug prints
dliappis May 1, 2018
bdc3f0e
Remove some more debugging prints
dliappis May 1, 2018
da1a168
Addressing PR comments
dliappis May 2, 2018
9dde567
Addressing PR comment
dliappis May 2, 2018
f9fc471
Addressing PR comment
dliappis May 2, 2018
484900b
Addressing PR comment
dliappis May 2, 2018
916b278
Addressing PR comment
dliappis May 2, 2018
89ca81e
Addressing PR comments
dliappis May 2, 2018
db17a6b
Addressing PR comments
dliappis May 2, 2018
28af9c0
Addressing PR comments
dliappis May 2, 2018
ba5e8b3
Addressing PR docs comments.
dliappis May 2, 2018
284e3fa
Addressing PR comment.
dliappis May 2, 2018
0a17876
Addressing PR comments
dliappis May 2, 2018
e04df54
Addressing PR comments
dliappis May 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions docs/adding_tracks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ A few things to note:
When you invoke ``esrally list tracks --track-path=~/rally-tracks/tutorial``, the new track should now appear::

dm@io:~ $ esrally list tracks --track-path=~/rally-tracks/tutorial

____ ____
/ __ \____ _/ / /_ __
/ /_/ / __ `/ / / / / /
/ _, _/ /_/ / / / /_/ /
/_/ |_|\__,_/_/_/\__, /
/____/
Available tracks:

Name Description Documents Compressed Size Uncompressed Size Default Challenge All Challenges
---------- ----------------------------- ----------- --------------- ----------------- ----------------- ---------------
tutorial Tutorial benchmark for Rally 11658903 N/A 1.4 GB index-and-query index-and-query
Expand Down Expand Up @@ -802,9 +802,6 @@ Similar to a parameter source you also need to bind the name of your operation t
If you need more control, you can also implement a runner class. The example above, implemented as a class looks as follows::

class PercolateRunner:
def __enter__(self):
return self

def __call__(self, es, params):
es.percolate(
index="queries",
Expand All @@ -819,7 +816,30 @@ If you need more control, you can also implement a runner class. The example abo
registry.register_runner("percolate", PercolateRunner())


The actual runner is implemented in the method ``__call__`` and the same return value conventions apply as for functions. For debugging purposes you should also implement ``__repr__`` and provide a human-readable name for your runner. Finally, you need to register your runner in the ``register`` function. Runners also support Python's `context manager <https://docs.python.org/3/library/stdtypes.html#typecontextmanager>`_ interface. Rally uses a new context for each request. Implementing the context manager interface can be handy for cleanup of resources after executing an operation. Rally uses it for example to clear open scrolls.
The actual runner is implemented in the method ``__call__`` and the same return value conventions apply as for functions. For debugging purposes you should also implement ``__repr__`` and provide a human-readable name for your runner. Finally, you need to register your runner in the ``register`` function. Runners also support Python's `context manager <https://docs.python.org/3/library/stdtypes.html#typecontextmanager>`_ interface. Rally uses a new context for each request. Implementing the context manager interface can be handy for cleanup of resources after executing an operation. Rally uses it, for example, to clear open scrolls.

If you have specified multiple Elasticsearch clusters using :ref:`target-hosts <command_line_reference_advanced_topics>` you can make Rally pass a dictionary of client connections instead of one for the ``default`` cluster in the ``es`` parameter.

To achieve this you need to:

* Use a runner class
* Specify ``multi_cluster = True`` as a class attribute
* Use any of the cluster names specified in :ref:`target-hosts <command_line_reference_advanced_topics>` as a key for the ``es`` dict

Example (assuming Rally has been invoked specifying ``default`` and ``remote`` in `target-hosts`)::

class CreateIndexInRemoteCluster:
multi_cluster = True

def __call__(self, es, params):
es['remote'].indices.create(index='remote-index')

def __repr__(self, *args, **kwargs):
return "create-index-in-remote-cluster"

def register(registry):
registry.register_runner("create-index-in-remote-cluster", CreateIndexInRemoteCluster())


.. note::

Expand Down
69 changes: 67 additions & 2 deletions docs/command_line_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ By default, Rally will run its load driver on the same machine where you start t

In the example, above Rally will generate load from the hosts ``10.17.20.5`` and ``10.17.20.6``. For this to work, you need to start a Rally daemon on these machines, see :ref:`distributing the load test driver <recipe_distributed_load_driver>` for a complete example.


``target-hosts``
~~~~~~~~~~~~~~~~

Expand All @@ -435,7 +434,9 @@ If you run the ``benchmark-only`` :doc:`pipeline </pipelines>` or you want Rally

esrally --pipeline=benchmark-only --target-hosts=10.17.0.5:9200,10.17.0.6:9200

This will run the benchmark against the hosts 10.17.0.5 and 10.17.0.6 on port 9200. See ``client-options`` if you use X-Pack Security and need to authenticate or Rally should use https.
** This will run the benchmark against the hosts 10.17.0.5 and 10.17.0.6 on port 9200. See ``client-options`` if you use X-Pack Security and need to authenticate or Rally should use https.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for adding ** here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo. Thanks for catching.


You can also target multiple clusters with ``--target-hosts`` for specific use cases. This is described in the Advanced topics section.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add an in-page link?


``quiet``
~~~~~~~~~
Expand Down Expand Up @@ -509,3 +510,67 @@ When you run ``esrally list races``, this will show up again::
20160518T112341Z pmc append-no-conflicts defaults disk:SSD,data_node_count:4

This will help you recognize a specific race when running ``esrally compare``.

.. _command_line_reference_advanced_topics:

Advanced topics
---------------

``target-hosts``
~~~~~~~~~~~~~~~~

Rally can also create client connections for multiple Elasticsearch clusters.
This is only useful if you want to create :ref:`custom runners <adding_tracks_custom_runners>` that execute operations against remote clusters, for example for Cross Cluster Search or Cross Cluster Replication.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace "against remote clusters" with "against multiple clusters"? Also maybe add a link for cross cluster search? I also think that both names need to be lower case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


To define the host:port pairs for additional clusters with ``target-hosts`` you can specify either a JSON filename (ending in ``.json``) or an inline JSON string. The JSON object should be a collection of name:value pairs. ``name`` is string for the cluster name and there **must be** one ``default``.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace "for additional clusters" with "for multiple clusters"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


Examples:

* json file: ``--target-hosts="target_hosts1.json"``::

{ "default": ["127.0.0.1:9200","10.127.0.3:19200"] }

* json file defining two clusters: ``--target-hosts="target_hosts2.json"``::

{
"default": [
{"host": "127.0.0.1", "port": 9200},
{"host": "127.0.0.1", "port": 19200}
],
"remote":[
{"host": "10.127.0.3", "port": 9200},
{"host": "10.127.0.8", "port": 9201}
]
}

* json inline string defining two clusters::

--target-hosts="{\"default\":[\"127.0.0.1:9200\"],\"remote\":[\"127.0.0.1:19200\",\"127.0.0.1:19201\"]}"

.. NOTE::
**All** internal :ref:`track operations <track_operations>` will use the connection to the ``default`` cluster. However, you can utilize the client connections to the additional clusters in your :ref:`custom runners <adding_tracks_custom_runners>`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It took me a while to understand the term "internal track operations". Maybe use "built-in operations" instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


``client-options``
~~~~~~~~~~~~~~~~~~

``client-options`` can optionally specify options for the Elasticsearch clients when multiple clusters have been defined with ``target-hosts``. If omitted, the default is ``timeout:60`` for all cluster connections.

The format is similar to ``target-hosts``, supporting both filenames ending in ``.json`` or inline JSON, however, the parameters are a collection of name:value pairs, as opposed to arrays.

Examples, assuming that two clusters have been specified with ``--target-hosts``:

* json file: ``--client-options="client_options1.json"``::

{
"default":
{"timeout": 60},
"remote":
{"use_ssl":true,"verify_certs": false,"ca_certs":"/path/to/cacert.pem"}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe reformat so it's easier to read?

{
  "default": {
    "timeout": 60
  },
  "remote": {
    "use_ssl": true,
    "verify_certs": false,
    "ca_certs": "/path/to/cacert.pem"
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


* json inline string defining two clusters::

--client-options="{\"default\":{\"timeout\": 60}, \"remote\": {\"use_ssl\":true,\"verify_certs\":false,\"ca_certs\":\"/path/to/cacert.pem\"}}"

.. WARNING::
If you use ``client-options`` you must specify options for **every** cluster name defined with ``target-hosts``. An error message will be displayed if there is a mismatch.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Use active voice? "An error message will be displayed if there is a mismatch." -> (something along the lines of): "Rally raises an error on mismatch."?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After checking the code I am also not sure where it would actually raise an error on mismatch (I spotted this because I think such a test is missing).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 changes: 2 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ We can also define default values on document corpus level but override some of
]


.. _track_operations:

operations
..........

Expand Down
10 changes: 8 additions & 2 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def merge(self, *args):

class LoadGenerator(actor.RallyActor):
"""
The actual driver that applies load against the cluster.
The actual driver that applies load against the cluster(s).

It will also regularly send measurements to the master node so it can consolidate them.
"""
Expand Down Expand Up @@ -582,12 +582,18 @@ def __init__(self):

@actor.no_retry("load generator")
def receiveMsg_StartLoadGenerator(self, msg, sender):
def EsClients(all_hosts, all_client_options):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to es_clients? (I immediately thought that this is a class when I read the code below that calls this function)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 0a17876

es = {}
for cluster_name, cluster_hosts in all_hosts.items():
es[cluster_name] = client.EsClientFactory(cluster_hosts, all_client_options[cluster_name]).create()
return es

logger.info("LoadGenerator[%d] is about to start." % msg.client_id)
self.master = sender
self.client_id = msg.client_id
self.config = load_local_config(msg.config)
self.abort_on_error = self.config.opts("driver", "on.error") == "abort"
self.es = client.EsClientFactory(self.config.opts("client", "hosts"), self.config.opts("client", "options")).create()
self.es = EsClients(self.config.opts("client", "hosts").all_hosts, self.config.opts("client", "options").all_client_options)
self.track = msg.track
track.set_absolute_data_path(self.config, self.track)
self.tasks = msg.tasks
Expand Down
68 changes: 61 additions & 7 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@ def runner_for(operation_type):


def register_runner(operation_type, runner):
if getattr(runner, "multi_cluster", False) == True:
if "__enter__" in dir(runner) and "__exit__" in dir(runner):
logger.info("Registering runner object [%s] for [%s]." % (str(runner), str(operation_type)))
__RUNNERS[operation_type] = MultiClusterDelegatingRunner(runner, str(runner), context_manager_enabled=True)
else:
logger.info("Registering context-manager capable runner object [%s] for [%s]." % (str(runner), str(operation_type)))
__RUNNERS[operation_type] = MultiClusterDelegatingRunner(runner, str(runner))
# we'd rather use callable() but this will erroneously also classify a class as callable...
if isinstance(runner, types.FunctionType):
elif isinstance(runner, types.FunctionType):
logger.info("Registering runner function [%s] for [%s]." % (str(runner), str(operation_type)))
__RUNNERS[operation_type] = DelegatingRunner(runner, runner.__name__)
__RUNNERS[operation_type] = SingleClusterDelegatingRunner(runner, runner.__name__)
elif "__enter__" in dir(runner) and "__exit__" in dir(runner):
logger.info("Registering context-manager capable runner object [%s] for [%s]." % (str(runner), str(operation_type)))
__RUNNERS[operation_type] = runner
__RUNNERS[operation_type] = SingleClusterDelegatingRunner(runner, str(runner), context_manager_enabled=True)
else:
logger.info("Registering runner object [%s] for [%s]." % (str(runner), str(operation_type)))
__RUNNERS[operation_type] = DelegatingRunner(runner, str(runner))
__RUNNERS[operation_type] = SingleClusterDelegatingRunner(runner, str(runner))


# Only intended for unit-testing!
Expand Down Expand Up @@ -61,16 +68,63 @@ def __exit__(self, exc_type, exc_val, exc_tb):
return False


class DelegatingRunner(Runner):
def __init__(self, runnable, name):
class SingleClusterDelegatingRunner(Runner):
def __init__(self, runnable, name, context_manager_enabled=False):
self.runnable = runnable
self.name = name
self.context_manager_enabled = context_manager_enabled

def __call__(self, *args):
# Single cluster mode: es parameter passed in runner is a client object for the "default" cluster
es = args[0]
return self.runnable(es['default'], *args[1:])

def __repr__(self, *args, **kwargs):
if self.context_manager_enabled:
return "user-defined context-manager enabled runner for [%s]" % self.name
else:
return "user-defined runner for [%s]" % self.name

def __enter__(self):
if self.context_manager_enabled:
self.runnable.__enter__()
return self
else:
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self.context_manager_enabled:
return self.runnable.__exit__(exc_type, exc_val, exc_tb)
else:
return False


class MultiClusterDelegatingRunner(Runner):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems MultiClusterDelegatingRunner does not support the context manager protocol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right; I thought we don't really need it as it is a special case after all. Then again, if it doesn't support the context manager protocol, I should clarify this in the docs, which sounds to me like more effort than adding the support :)

def __init__(self, runnable, name, context_manager_enabled=False):
self.runnable = runnable
self.name = name
self.context_manager_enabled = context_manager_enabled

def __call__(self, *args):
# Multi cluster mode: pass the entire es dict and let runner code handle connections to different clusters
return self.runnable(*args)

def __repr__(self, *args, **kwargs):
return "user-defined runner for [%s]" % self.name
if self.context_manager_enabled:
return "user-defined multi-cluster context-manager enabled runner for [%s]" % self.name
else:
return "user-defined multi-cluster enabled runner for [%s]" % self.name

def __enter__(self):
if self.context_manager_enabled:
self.runnable.__enter__()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self.context_manager_enabled:
return self.runnable.__exit__(exc_type, exc_val, exc_tb)
else:
return False


def mandatory(params, key, op):
Expand Down
36 changes: 22 additions & 14 deletions esrally/mechanic/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,31 @@ def start(self):
"""
enabled_devices = self.cfg.opts("mechanic", "telemetry.devices")
telemetry_params = self.cfg.opts("mechanic", "telemetry.params")
hosts = self.cfg.opts("client", "hosts")
client_options = self.cfg.opts("client", "options")
es = self.client_factory(hosts, client_options).create()
all_hosts = self.cfg.opts("client", "hosts").all_hosts
default_hosts = self.cfg.opts("client", "hosts").default

es = {}
for cluster_name, cluster_hosts in all_hosts.items():
all_client_options = self.cfg.opts("client", "options").all_client_options
cluster_client_options = all_client_options[cluster_name]
es[cluster_name] = self.client_factory(cluster_hosts, cluster_client_options).create()

es_default = es["default"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure about using the "magic name" default here because it actually breaks encapsulation.


t = telemetry.Telemetry(enabled_devices, devices=[
telemetry.NodeStats(telemetry_params, es, self.metrics_store),
telemetry.ClusterMetaDataInfo(es),
telemetry.ClusterEnvironmentInfo(es, self.metrics_store),
telemetry.GcTimesSummary(es, self.metrics_store),
telemetry.IndexStats(es, self.metrics_store),
telemetry.MlBucketProcessingTime(es, self.metrics_store)
telemetry.NodeStats(telemetry_params, es_default, self.metrics_store),
telemetry.ClusterMetaDataInfo(es_default),
telemetry.ClusterEnvironmentInfo(es_default, self.metrics_store),
telemetry.GcTimesSummary(es_default, self.metrics_store),
telemetry.IndexStats(es_default, self.metrics_store),
telemetry.MlBucketProcessingTime(es_default, self.metrics_store)
])

# The list of nodes will be populated by ClusterMetaDataInfo, so no need to do it here
c = cluster.Cluster(hosts, [], t)
c = cluster.Cluster(default_hosts, [], t)

logger.info("All cluster nodes have successfully started. Checking if REST API is available.")
if wait_for_rest_layer(es, max_attempts=40):
if wait_for_rest_layer(es_default, max_attempts=40):
logger.info("REST API is available. Attaching telemetry devices to cluster.")
t.attach_to_cluster(c)
logger.info("Telemetry devices are now attached to the cluster.")
Expand All @@ -88,7 +96,7 @@ def start(self):
self.stop(c)
raise exceptions.LaunchError("Elasticsearch REST API layer is not available. Forcefully terminated cluster.")
if self.on_post_launch:
self.on_post_launch(es)
self.on_post_launch(es["default"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you use es_default here?

return c

def stop(self, c):
Expand Down Expand Up @@ -184,8 +192,8 @@ def __init__(self, cfg, metrics_store, client_factory_class=client.EsClientFacto
self.client_factory = client_factory_class

def start(self, node_configurations=None):
hosts = self.cfg.opts("client", "hosts")
client_options = self.cfg.opts("client", "options")
hosts = self.cfg.opts("client", "hosts").default
client_options = self.cfg.opts("client", "options").default
es = self.client_factory(hosts, client_options).create()

# cannot enable custom telemetry devices here
Expand Down
6 changes: 3 additions & 3 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ def cluster_distribution_version(cfg, client_factory=client.EsClientFactory):
:param client_factory: Factory class that creates the Elasticsearch client.
:return: The distribution version.
"""
hosts = cfg.opts("client", "hosts")
client_options = cfg.opts("client", "options")
hosts = cfg.opts("client", "hosts").default
client_options = cfg.opts("client", "options").default
es = client_factory(hosts, client_options).create()
return es.info()["version"]["number"]

Expand Down Expand Up @@ -266,7 +266,7 @@ def receiveMsg_StartEngine(self, msg, sender):
self.car, _ = load_team(self.cfg, msg.external)

# In our startup procedure we first create all mechanics. Only if this succeeds we'll continue.
hosts = self.cfg.opts("client", "hosts")
hosts = self.cfg.opts("client", "hosts").default
if len(hosts) == 0:
raise exceptions.LaunchError("No target hosts are configured.")

Expand Down
2 changes: 1 addition & 1 deletion esrally/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,7 @@ def _max_results(self):
class CompositeRaceStore:
"""
Internal helper class to store races as file and to Elasticsearch in case users want Elasticsearch as a race store.

It provides the same API as RaceStore. It delegates writes to all stores and all read operations only the Elasticsearch race store.
"""
def __init__(self, es_store, es_results_store, file_store):
Expand Down
Loading