-
Notifications
You must be signed in to change notification settings - Fork 313
/
runner.py
3044 lines (2523 loc) · 123 KB
/
runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import asyncio
import contextvars
import json
import logging
import random
import re
import sys
import time
from collections import Counter, OrderedDict
from copy import deepcopy
from enum import Enum
from functools import total_ordering
from io import BytesIO
from os.path import commonprefix
from types import FunctionType
from typing import List, Optional
import ijson
from esrally import exceptions, track, types
from esrally.utils import convert
from esrally.utils.versions import Version
# Mapping from operation type to specific runner
__RUNNERS = {}
def register_default_runners(config: Optional[types.Config] = None):
register_runner(track.OperationType.Bulk, BulkIndex(), async_runner=True)
register_runner(track.OperationType.ForceMerge, ForceMerge(), async_runner=True)
register_runner(track.OperationType.IndexStats, Retry(IndicesStats()), async_runner=True)
register_runner(track.OperationType.NodeStats, NodeStats(), async_runner=True)
register_runner(track.OperationType.Search, Query(config=config), async_runner=True)
register_runner(track.OperationType.PaginatedSearch, Query(config=config), async_runner=True)
register_runner(track.OperationType.CompositeAgg, Query(config=config), async_runner=True)
register_runner(track.OperationType.ScrollSearch, Query(config=config), async_runner=True)
register_runner(track.OperationType.RawRequest, RawRequest(), async_runner=True)
register_runner(track.OperationType.Composite, Composite(), async_runner=True)
register_runner(track.OperationType.SubmitAsyncSearch, SubmitAsyncSearch(), async_runner=True)
register_runner(track.OperationType.GetAsyncSearch, Retry(GetAsyncSearch(), retry_until_success=True), async_runner=True)
register_runner(track.OperationType.DeleteAsyncSearch, DeleteAsyncSearch(), async_runner=True)
register_runner(track.OperationType.OpenPointInTime, OpenPointInTime(), async_runner=True)
register_runner(track.OperationType.ClosePointInTime, ClosePointInTime(), async_runner=True)
register_runner(track.OperationType.Sql, Sql(), async_runner=True)
register_runner(track.OperationType.FieldCaps, FieldCaps(), async_runner=True)
register_runner(track.OperationType.Esql, Esql(), async_runner=True)
# This is an administrative operation but there is no need for a retry here as we don't issue a request
register_runner(track.OperationType.Sleep, Sleep(), async_runner=True)
# these requests should not be retried as they are not idempotent
register_runner(track.OperationType.CreateSnapshot, CreateSnapshot(), async_runner=True)
register_runner(track.OperationType.RestoreSnapshot, RestoreSnapshot(), async_runner=True)
register_runner(track.OperationType.Downsample, Downsample(), async_runner=True)
# We treat the following as administrative commands and thus already start to wrap them in a retry.
register_runner(track.OperationType.ClusterHealth, Retry(ClusterHealth()), async_runner=True)
register_runner(track.OperationType.PutPipeline, Retry(PutPipeline()), async_runner=True)
register_runner(track.OperationType.Refresh, Retry(Refresh()), async_runner=True)
register_runner(track.OperationType.CreateIndex, Retry(CreateIndex()), async_runner=True)
register_runner(track.OperationType.DeleteIndex, Retry(DeleteIndex(config=config)), async_runner=True)
register_runner(track.OperationType.CreateComponentTemplate, Retry(CreateComponentTemplate()), async_runner=True)
register_runner(track.OperationType.DeleteComponentTemplate, Retry(DeleteComponentTemplate()), async_runner=True)
register_runner(track.OperationType.CreateComposableTemplate, Retry(CreateComposableTemplate()), async_runner=True)
register_runner(track.OperationType.DeleteComposableTemplate, Retry(DeleteComposableTemplate(config=config)), async_runner=True)
register_runner(track.OperationType.CreateDataStream, Retry(CreateDataStream()), async_runner=True)
register_runner(track.OperationType.DeleteDataStream, Retry(DeleteDataStream()), async_runner=True)
register_runner(track.OperationType.CreateIndexTemplate, Retry(CreateIndexTemplate()), async_runner=True)
register_runner(track.OperationType.DeleteIndexTemplate, Retry(DeleteIndexTemplate()), async_runner=True)
register_runner(track.OperationType.ShrinkIndex, Retry(ShrinkIndex()), async_runner=True)
register_runner(track.OperationType.CreateMlDatafeed, Retry(CreateMlDatafeed()), async_runner=True)
register_runner(track.OperationType.DeleteMlDatafeed, Retry(DeleteMlDatafeed()), async_runner=True)
register_runner(track.OperationType.StartMlDatafeed, Retry(StartMlDatafeed()), async_runner=True)
register_runner(track.OperationType.StopMlDatafeed, Retry(StopMlDatafeed()), async_runner=True)
register_runner(track.OperationType.CreateMlJob, Retry(CreateMlJob()), async_runner=True)
register_runner(track.OperationType.DeleteMlJob, Retry(DeleteMlJob()), async_runner=True)
register_runner(track.OperationType.OpenMlJob, Retry(OpenMlJob()), async_runner=True)
register_runner(track.OperationType.CloseMlJob, Retry(CloseMlJob()), async_runner=True)
register_runner(track.OperationType.DeleteSnapshotRepository, Retry(DeleteSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.CreateSnapshotRepository, Retry(CreateSnapshotRepository()), async_runner=True)
register_runner(track.OperationType.WaitForSnapshotCreate, Retry(WaitForSnapshotCreate()), async_runner=True)
register_runner(track.OperationType.WaitForCurrentSnapshotsCreate, Retry(WaitForCurrentSnapshotsCreate()), async_runner=True)
register_runner(track.OperationType.WaitForRecovery, Retry(IndicesRecovery()), async_runner=True)
register_runner(track.OperationType.PutSettings, Retry(PutSettings()), async_runner=True)
register_runner(track.OperationType.CreateTransform, Retry(CreateTransform()), async_runner=True)
register_runner(track.OperationType.StartTransform, Retry(StartTransform()), async_runner=True)
register_runner(track.OperationType.WaitForTransform, Retry(WaitForTransform()), async_runner=True)
register_runner(track.OperationType.DeleteTransform, Retry(DeleteTransform()), async_runner=True)
register_runner(track.OperationType.TransformStats, Retry(TransformStats()), async_runner=True)
register_runner(track.OperationType.CreateIlmPolicy, Retry(CreateIlmPolicy()), async_runner=True)
register_runner(track.OperationType.DeleteIlmPolicy, Retry(DeleteIlmPolicy()), async_runner=True)
def runner_for(operation_type):
try:
return __RUNNERS[operation_type]
except KeyError:
raise exceptions.RallyError(f"No runner available for operation-type: [{operation_type}]")
def enable_assertions(enabled):
"""
Changes whether assertions are enabled. The status changes for all tasks that are executed after this call.
:param enabled: ``True`` to enable assertions, ``False`` to disable them.
"""
AssertingRunner.assertions_enabled = enabled
def register_runner(operation_type, runner, **kwargs):
logger = logging.getLogger(__name__)
async_runner = kwargs.get("async_runner", False)
if isinstance(operation_type, track.OperationType):
operation_type = operation_type.to_hyphenated_string()
if not async_runner:
raise exceptions.RallyAssertionError(
f"Runner [{str(runner)}] must be implemented as async runner and registered with async_runner=True."
)
if hasattr(unwrap(runner), "multi_cluster"):
if "__aenter__" in dir(runner) and "__aexit__" in dir(runner):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Registering runner object [%s] for [%s].", str(runner), str(operation_type))
cluster_aware_runner = _multi_cluster_runner(runner, str(runner), context_manager_enabled=True)
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), str(operation_type))
cluster_aware_runner = _multi_cluster_runner(runner, str(runner))
# we'd rather use callable() but this will erroneously also classify a class as callable...
elif isinstance(runner, FunctionType):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Registering runner function [%s] for [%s].", str(runner), str(operation_type))
cluster_aware_runner = _single_cluster_runner(runner, runner.__name__)
elif "__aenter__" in dir(runner) and "__aexit__" in dir(runner):
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Registering context-manager capable runner object [%s] for [%s].", str(runner), str(operation_type))
cluster_aware_runner = _single_cluster_runner(runner, str(runner), context_manager_enabled=True)
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Registering runner object [%s] for [%s].", str(runner), str(operation_type))
cluster_aware_runner = _single_cluster_runner(runner, str(runner))
__RUNNERS[operation_type] = _with_completion(_with_assertions(cluster_aware_runner))
# Only intended for unit-testing!
def remove_runner(operation_type):
del __RUNNERS[operation_type]
class Runner:
"""
Base class for all operations against Elasticsearch.
"""
def __init__(self, *args, config=None, **kwargs):
super().__init__(*args, **kwargs)
self.logger = logging.getLogger(__name__)
self.serverless_mode = False
self.serverless_operator = False
if config:
self.serverless_mode = convert.to_bool(config.opts("driver", "serverless.mode", mandatory=False, default_value=False))
self.serverless_operator = convert.to_bool(config.opts("driver", "serverless.operator", mandatory=False, default_value=False))
async def __aenter__(self):
return self
async def __call__(self, es, params):
"""
Runs the actual method that should be benchmarked.
:param args: All arguments that are needed to call this method.
:return: A pair of (int, String). The first component indicates the "weight" of this call. it is typically 1 but for bulk operations
it should be the actual bulk size. The second component is the "unit" of weight which should be "ops" (short for
"operations") by default. If applicable, the unit should always be in plural form. It is used in metrics records
for throughput and reports. A value will then be shown as e.g. "111 ops/s".
"""
raise NotImplementedError("abstract operation")
async def __aexit__(self, exc_type, exc_val, exc_tb):
return False
def _default_kw_params(self, params):
# map of API kwargs to Rally config parameters
kw_dict = {
"body": "body",
"headers": "headers",
"index": "index",
"opaque_id": "opaque-id",
"params": "request-params",
"request_timeout": "request-timeout",
}
full_result = {k: params.get(v) for (k, v) in kw_dict.items()}
# filter Nones
return dict(filter(lambda kv: kv[1] is not None, full_result.items()))
@staticmethod
def _transport_request_params(params):
"""
Takes all of a runner's params and splits out request parameters, transport
level parameters, and headers into their own respective dicts.
:param params: A hash with all the respective runner's parameters.
:return: A tuple of the specific runner's params, request level parameters, transport level parameters, and headers, respectively.
"""
transport_params = {}
request_params = params.get("request-params", {})
if request_timeout := params.pop("request-timeout", None):
transport_params["request_timeout"] = request_timeout
if (ignore_status := request_params.pop("ignore", None)) or (ignore_status := params.pop("ignore", None)):
transport_params["ignore_status"] = ignore_status
headers = params.pop("headers", None) or {}
if opaque_id := params.pop("opaque-id", None):
headers.update({"x-opaque-id": opaque_id})
return params, request_params, transport_params, headers
class Delegator:
"""
Mixin to unify delegate handling
"""
def __init__(self, delegate, *args, **kwargs):
super().__init__(*args, **kwargs)
self.delegate = delegate
def unwrap(runner):
"""
Unwraps all delegators until the actual runner.
:param runner: An arbitrarily nested chain of delegators around a runner.
:return: The innermost runner.
"""
delegate = getattr(runner, "delegate", None)
if delegate:
return unwrap(delegate)
else:
return runner
def _single_cluster_runner(runnable, name, context_manager_enabled=False):
# only pass the default ES client
return MultiClientRunner(runnable, name, lambda es: es["default"], context_manager_enabled)
def _multi_cluster_runner(runnable, name, context_manager_enabled=False):
# pass all ES clients
return MultiClientRunner(runnable, name, lambda es: es, context_manager_enabled)
def _with_assertions(delegate):
return AssertingRunner(delegate)
def _with_completion(delegate):
unwrapped_runner = unwrap(delegate)
if hasattr(unwrapped_runner, "completed") and hasattr(unwrapped_runner, "percent_completed"):
return WithCompletion(delegate, unwrapped_runner)
else:
return NoCompletion(delegate)
class NoCompletion(Runner, Delegator):
def __init__(self, delegate):
super().__init__(delegate=delegate)
@property
def completed(self):
return None
@property
def percent_completed(self):
return None
async def __call__(self, *args):
return await self.delegate(*args)
def __repr__(self, *args, **kwargs):
return repr(self.delegate)
async def __aenter__(self):
await self.delegate.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self.delegate.__aexit__(exc_type, exc_val, exc_tb)
class WithCompletion(Runner, Delegator):
def __init__(self, delegate, progressable):
super().__init__(delegate=delegate)
self.progressable = progressable
@property
def completed(self):
return self.progressable.completed
@property
def percent_completed(self):
return self.progressable.percent_completed
async def __call__(self, *args):
return await self.delegate(*args)
def __repr__(self, *args, **kwargs):
return repr(self.delegate)
async def __aenter__(self):
await self.delegate.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self.delegate.__aexit__(exc_type, exc_val, exc_tb)
class MultiClientRunner(Runner, Delegator):
def __init__(self, runnable, name, client_extractor, context_manager_enabled=False):
super().__init__(delegate=runnable)
self.name = name
self.client_extractor = client_extractor
self.context_manager_enabled = context_manager_enabled
async def __call__(self, *args):
return await self.delegate(self.client_extractor(args[0]), *args[1:])
def __repr__(self, *args, **kwargs):
if self.context_manager_enabled:
return "user-defined context-manager enabled runner for [%s]" % self.name
else:
return "user-defined runner for [%s]" % self.name
async def __aenter__(self):
if self.context_manager_enabled:
await self.delegate.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.context_manager_enabled:
return await self.delegate.__aexit__(exc_type, exc_val, exc_tb)
else:
return False
class AssertingRunner(Runner, Delegator):
assertions_enabled = False
def __init__(self, delegate):
super().__init__(delegate=delegate)
self.predicates = {
">": self.greater_than,
">=": self.greater_than_or_equal,
"<": self.smaller_than,
"<=": self.smaller_than_or_equal,
"==": self.equal,
}
def greater_than(self, expected, actual):
return actual > expected
def greater_than_or_equal(self, expected, actual):
return actual >= expected
def smaller_than(self, expected, actual):
return actual < expected
def smaller_than_or_equal(self, expected, actual):
return actual <= expected
def equal(self, expected, actual):
return actual == expected
def check_assertion(self, op_name, assertion, properties):
path = assertion["property"]
predicate_name = assertion["condition"]
expected_value = assertion["value"]
actual_value = properties
for k in path.split("."):
actual_value = actual_value[k]
predicate = self.predicates[predicate_name]
success = predicate(expected_value, actual_value)
if not success:
if op_name:
msg = f"Expected [{path}] in [{op_name}] to be {predicate_name} [{expected_value}] but was [{actual_value}]."
else:
msg = f"Expected [{path}] to be {predicate_name} [{expected_value}] but was [{actual_value}]."
raise exceptions.RallyTaskAssertionError(msg)
async def __call__(self, *args):
params = args[1]
return_value = await self.delegate(*args)
if AssertingRunner.assertions_enabled and "assertions" in params:
op_name = params.get("name")
if isinstance(return_value, dict):
for assertion in params["assertions"]:
self.check_assertion(op_name, assertion, return_value)
else:
raise exceptions.DataError(f"Cannot check assertion in [{op_name}] as [{repr(self.delegate)}] did not return a dict.")
return return_value
def __repr__(self, *args, **kwargs):
return repr(self.delegate)
async def __aenter__(self):
await self.delegate.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self.delegate.__aexit__(exc_type, exc_val, exc_tb)
def mandatory(params, key, op):
try:
return params[key]
except KeyError:
raise exceptions.DataError(
f"Parameter source for operation '{str(op)}' did not provide the mandatory parameter '{key}'. "
f"Add it to your parameter source and try again."
)
# TODO: remove and use https://docs.python.org/3/library/stdtypes.html#str.removeprefix
# once Python 3.9 becomes the minimum version
def remove_prefix(string, prefix):
if string.startswith(prefix):
return string[len(prefix) :]
return string
def escape(v):
"""
Escapes values so they can be used as query parameters
:param v: The raw value. May be None.
:return: The escaped value.
"""
if v is None:
return None
elif isinstance(v, bool):
return str(v).lower()
else:
return str(v)
class BulkIndex(Runner):
"""
Bulk indexes the given documents.
"""
async def __call__(self, es, params):
"""
Runs one bulk indexing operation.
:param es: The Elasticsearch client.
:param params: A hash with all parameters. See below for details.
:return: A hash with meta data for this bulk operation. See below for details.
It expects a parameter dict with the following mandatory keys:
* ``body``: containing all documents for the current bulk request.
* ``bulk-size``: An indication of the bulk size denoted in ``unit``.
* ``unit``: The name of the unit in which the bulk size is provided.
* ``action_metadata_present``: if ``True``, assume that an action and metadata line is present (meaning only half of the lines
contain actual documents to index)
* ``index``: The name of the affected index in case ``action_metadata_present`` is ``False``.
* ``type``: The name of the affected type in case ``action_metadata_present`` is ``False``.
The following keys are optional:
* ``pipeline``: If present, runs the the specified ingest pipeline for this bulk.
* ``detailed-results``: If ``True``, the runner will analyze the response and add detailed meta-data. Defaults to ``False``. Note
that this has a very significant impact on performance and will very likely cause a bottleneck in the benchmark driver so please
be very cautious enabling this feature. Our own measurements have shown a median overhead of several thousand times (execution time
is in the single digit microsecond range when this feature is disabled and in the single digit millisecond range when this feature
is enabled; numbers based on a bulk size of 500 elements and no errors). For details please refer to the respective benchmarks
in ``benchmarks/driver``.
* ``timeout``: a time unit value indicating the server-side timeout for the operation
* ``request-timeout``: a non-negative float indicating the client-side timeout for the operation. If not present, defaults to
``None`` and potentially falls back to the global timeout setting.
* ``refresh``: If ``"true"``, Elasticsearch will issue an async refresh to the index; i.e., ``?refresh=true``.
If ``"wait_for"``, Elasticsearch issues a synchronous refresh to the index; i.e., ``?refresh=wait_for``.
If ``"false""``, Elasticsearch will use refresh defaults; i.e., ``?refresh=false``.
"""
detailed_results = params.get("detailed-results", False)
api_kwargs = self._default_kw_params(params)
bulk_params = {}
if "timeout" in params:
bulk_params["timeout"] = params["timeout"]
if "pipeline" in params:
bulk_params["pipeline"] = params["pipeline"]
if "refresh" in params:
valid_refresh_values = ("wait_for", "true", "false")
if params["refresh"] not in valid_refresh_values:
raise exceptions.RallyAssertionError(
f"Unsupported bulk refresh value: {params['refresh']}. Use one of [{', '.join(valid_refresh_values)}]."
)
bulk_params["refresh"] = params["refresh"]
with_action_metadata = mandatory(params, "action-metadata-present", self)
bulk_size = mandatory(params, "bulk-size", self)
unit = mandatory(params, "unit", self)
# parse responses lazily in the standard case - responses might be large thus parsing skews results and if no
# errors have occurred we only need a small amount of information from the potentially large response.
if not detailed_results:
es.return_raw_response()
if with_action_metadata:
api_kwargs.pop("index", None)
# only half of the lines are documents
response = await es.bulk(params=bulk_params, **api_kwargs)
else:
response = await es.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs)
stats = self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response)
meta_data = {
"index": params.get("index"),
"weight": bulk_size,
"unit": unit,
}
meta_data.update(stats)
if not stats["success"]:
meta_data["error-type"] = "bulk"
return meta_data
def detailed_stats(self, params, response):
def _utf8len(line):
if isinstance(line, bytes):
return len(line)
else:
return len(line.encode("utf-8"))
ops = {}
shards_histogram = OrderedDict()
bulk_error_count = 0
bulk_success_count = 0
error_details = set()
bulk_request_size_bytes = 0
total_document_size_bytes = 0
with_action_metadata = mandatory(params, "action-metadata-present", self)
if isinstance(params["body"], bytes):
bulk_lines = params["body"].split(b"\n")
elif isinstance(params["body"], str):
bulk_lines = params["body"].split("\n")
elif isinstance(params["body"], list):
bulk_lines = params["body"]
else:
raise exceptions.DataError("bulk body is not of type bytes, string, or list")
for line_number, data in enumerate(bulk_lines):
line_size = _utf8len(data)
if with_action_metadata:
if line_number % 2 == 1:
total_document_size_bytes += line_size
else:
total_document_size_bytes += line_size
bulk_request_size_bytes += line_size
for item in response["items"]:
# there is only one (top-level) item
op, data = next(iter(item.items()))
if op not in ops:
ops[op] = Counter()
ops[op]["item-count"] += 1
if "result" in data:
ops[op][data["result"]] += 1
if "_shards" in data:
s = data["_shards"]
sk = "%d-%d-%d" % (s["total"], s["successful"], s["failed"])
if sk not in shards_histogram:
shards_histogram[sk] = {"item-count": 0, "shards": s}
shards_histogram[sk]["item-count"] += 1
if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0):
bulk_error_count += 1
self.extract_error_details(error_details, data)
else:
bulk_success_count += 1
stats = {
"took": response.get("took"),
"success": bulk_error_count == 0,
"success-count": bulk_success_count,
"error-count": bulk_error_count,
"ops": ops,
"shards_histogram": list(shards_histogram.values()),
"bulk-request-size-bytes": bulk_request_size_bytes,
"total-document-size-bytes": total_document_size_bytes,
}
if bulk_error_count > 0:
stats["error-type"] = "bulk"
stats["error-description"] = self.error_description(error_details)
self.logger.warning("Bulk request failed: [%s]", stats["error-description"])
if "ingest_took" in response:
stats["ingest_took"] = response["ingest_took"]
return stats
def simple_stats(self, bulk_size, unit, response):
bulk_success_count = bulk_size if unit == "docs" else None
bulk_error_count = 0
error_details = set()
# parse lazily on the fast path
props = parse(response, ["errors", "took"])
if props.get("errors", False):
# determine success count regardless of unit because we need to iterate through all items anyway
bulk_success_count = 0
# Reparse fully in case of errors - this will be slower
parsed_response = json.loads(response.getvalue())
for item in parsed_response["items"]:
data = next(iter(item.values()))
if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0):
bulk_error_count += 1
self.extract_error_details(error_details, data)
else:
bulk_success_count += 1
stats = {
"took": props.get("took"),
"success": bulk_error_count == 0,
"success-count": bulk_success_count,
"error-count": bulk_error_count,
}
if bulk_error_count > 0:
stats["error-type"] = "bulk"
stats["error-description"] = self.error_description(error_details)
return stats
def extract_error_details(self, error_details, data):
error_data = data.get("error", {})
error_reason = error_data.get("reason") if isinstance(error_data, dict) else str(error_data)
if error_data:
error_details.add((data["status"], error_reason))
else:
error_details.add((data["status"], None))
def _error_status_summary(self, error_details):
"""
Generates error status code summary.
:param error_details: accumulated error details
:return: error status summary
"""
status_counts = {}
for status, _ in error_details:
status_counts[status] = status_counts.get(status, 0) + 1
status_summaries = []
for status in sorted(status_counts.keys()):
status_summaries.append(f"{status_counts[status]}x{status}")
return ", ".join(status_summaries)
def error_description(self, error_details):
"""
Generates error description with an arbitrary limit of 5 errors.
:param error_details: accumulated error details
:return: error description
"""
error_descriptions = []
is_truncated = False
for count, error_detail in enumerate(sorted(error_details)):
status, reason = error_detail
if count < 5:
if reason:
error_descriptions.append(f"HTTP status: {status}, message: {reason}")
else:
error_descriptions.append(f"HTTP status: {status}")
else:
is_truncated = True
break
description = " | ".join(error_descriptions)
if is_truncated:
description = description + " | TRUNCATED " + self._error_status_summary(error_details)
return description
def __repr__(self, *args, **kwargs):
return "bulk-index"
class ForceMerge(Runner):
"""
Runs a force merge operation against Elasticsearch.
"""
async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch
max_num_segments = params.get("max-num-segments")
mode = params.get("mode")
merge_params = self._default_kw_params(params)
if max_num_segments:
merge_params["max_num_segments"] = max_num_segments
if mode == "polling":
complete = False
try:
await es.indices.forcemerge(**merge_params)
complete = True
except elasticsearch.ConnectionTimeout:
pass
while not complete:
await asyncio.sleep(params.get("poll-period"))
tasks = await es.tasks.list(params={"actions": "indices:admin/forcemerge"})
if len(tasks["nodes"]) == 0:
# empty nodes response indicates no tasks
complete = True
else:
await es.indices.forcemerge(**merge_params)
def __repr__(self, *args, **kwargs):
return "force-merge"
class IndicesStats(Runner):
"""
Gather index stats for all indices.
"""
def _get(self, v, path):
if v is None:
return None
elif len(path) == 1:
return v.get(path[0])
else:
return self._get(v.get(path[0]), path[1:])
def _safe_string(self, v):
return str(v) if v is not None else None
async def __call__(self, es, params):
api_kwargs = self._default_kw_params(params)
index = api_kwargs.pop("index", "_all")
condition = params.get("condition")
response = await es.indices.stats(index=index, metric="_all", **api_kwargs)
if condition:
path = mandatory(condition, "path", repr(self))
expected_value = mandatory(condition, "expected-value", repr(self))
actual_value = self._get(response, path.split("."))
return {
"weight": 1,
"unit": "ops",
"condition": {
"path": path,
# avoid mapping issues in the ES metrics store by always rendering values as strings
"actual-value": self._safe_string(actual_value),
"expected-value": self._safe_string(expected_value),
},
# currently we only support "==" as a predicate but that might change in the future
"success": actual_value == expected_value,
}
else:
return {
"weight": 1,
"unit": "ops",
"success": True,
}
def __repr__(self, *args, **kwargs):
return "indices-stats"
class NodeStats(Runner):
"""
Gather node stats for all nodes.
"""
async def __call__(self, es, params):
request_timeout = params.get("request-timeout")
await es.options(request_timeout=request_timeout).nodes.stats(metric="_all")
def __repr__(self, *args, **kwargs):
return "node-stats"
def parse(text: BytesIO, props: List[str], lists: List[str] = None, objects: List[str] = None) -> dict:
"""
Selectively parse the provided text as JSON extracting only the properties provided in ``props``. If ``lists`` is
specified, this function determines whether the provided lists are empty (respective value will be ``True``) or
contain elements (respective key will be ``False``). If ``objects`` is specified, it will in addition extract
the JSON objects under the given keys. These JSON objects must be flat dicts, only containing primitive types
within.
:param text: A text to parse.
:param props: A mandatory list of property paths (separated by a dot character) for which to extract values.
:param lists: An optional list of property paths to JSON lists in the provided text.
:param objects: An optional list of property paths to flat JSON objects in the provided text.
:return: A dict containing all properties, lists, and flat objects that have been found in the provided text.
"""
text.seek(0)
parser = ijson.parse(text)
parsed = {}
parsed_lists = {}
current_object = {}
current_list = None
expect_end_array = False
parsed_objects = {}
in_object = None
try:
for prefix, event, value in parser:
if expect_end_array:
# True if the list is empty, False otherwise
parsed_lists[current_list] = event == "end_array"
expect_end_array = False
if prefix in props:
parsed[prefix] = value
elif lists is not None and prefix in lists and event == "start_array":
current_list = prefix
expect_end_array = True
elif objects is not None and event == "end_map" and prefix in objects:
parsed_objects[in_object] = current_object
in_object = None
elif objects is not None and event == "start_map" and prefix in objects:
in_object = prefix
current_object = {}
elif in_object and event in ["boolean", "integer", "double", "number", "string"]:
current_object[prefix[len(in_object) + 1 :]] = value
# found all necessary properties
if (
len(parsed) == len(props)
and (lists is None or len(parsed_lists) == len(lists))
and (objects is None or len(parsed_objects) == len(objects))
):
break
except ijson.IncompleteJSONError:
# did not find all properties
pass
parsed.update(parsed_lists)
parsed.update(parsed_objects)
return parsed
class Query(Runner):
"""
Runs a request body search against Elasticsearch.
It expects at least the following keys in the `params` hash:
* `operation-type`: One of `search`, `paginated-search`, `scroll-search`, or `composite-agg`
* `index`: The index or indices against which to issue the query.
* `type`: See `index`
* `cache`: True iff the request cache should be used.
* `body`: Query body
The following parameters are optional:
* `detailed-results` (default: ``False``): Records more detailed meta-data about queries. As it analyzes the
corresponding response in more detail, this might incur additional
overhead which can skew measurement results. This flag is ineffective
for scroll queries or composite aggs (detailed meta-data are always returned).
* ``request-timeout``: a non-negative float indicating the client-side timeout for the operation. If not present,
defaults to ``None`` and potentially falls back to the global timeout setting.
* `results-per-page`: Number of results to retrieve per page. This maps to the Search API's ``size`` parameter, and
can be used for paginated and non-paginated searches. Defaults to ``10``
If the following parameters are present in addition, a paginated query will be issued:
* `pages`: Number of pages to retrieve at most for this search. If a query yields fewer results than the specified
number of pages we will terminate earlier.
Returned meta data
The following meta data are always returned:
* ``weight``: operation-agnostic representation of the "weight" of an operation (used internally by Rally for throughput calculation).
Always 1 for normal queries and the number of retrieved pages for scroll queries or composite aggs.
* ``unit``: The unit in which to interpret ``weight``. Always "ops".
* ``hits``: Total number of hits for this operation.
* ``hits_relation``: whether ``hits`` is accurate (``eq``) or a lower bound of the actual hit count (``gte``).
* ``timed_out``: Whether the search has timed out. For scroll queries, this flag is ``True`` if the flag was ``True`` for any of the
queries issued.
For paginated queries we also return:
* ``pages``: Total number of pages that have been retrieved.
"""
def __init__(self, config=None):
super().__init__(config=config)
self._search_after_extractor = SearchAfterExtractor()
self._composite_agg_extractor = CompositeAggExtractor()
async def __call__(self, es, params):
params, request_params, transport_params, headers = self._transport_request_params(params)
# we don't set headers at the options level because the Query runner sets them via the client's '_perform_request' method
es = es.options(**transport_params)
# Mandatory to ensure it is always provided. This is especially important when this runner is used in a
# composite context where there is no actual parameter source and the entire request structure must be provided
# by the composite's parameter source.
index = mandatory(params, "index", self)
body = mandatory(params, "body", self)
operation_type = params.get("operation-type")
size = params.get("results-per-page")
if size and operation_type != "composite-agg":
body["size"] = size
detailed_results = params.get("detailed-results", False)
encoding_header = self._query_headers(params)
if encoding_header is not None:
headers.update(encoding_header)
cache = params.get("cache")
if cache is not None:
request_params["request_cache"] = str(cache).lower()
elif self.serverless_mode and not self.serverless_operator:
request_params["request_cache"] = "false"
if not bool(headers):
# counter-intuitive but preserves prior behavior
headers = None
# disable eager response parsing - responses might be huge thus skewing results
es.return_raw_response()
async def _search_after_query(es, params):
index = params.get("index", "_all")
pit_op = params.get("with-point-in-time-from")
results = {
"unit": "pages",
"success": True,
"timed_out": False,
"took": 0,
}
if pit_op:
# these are disallowed as they are encoded in the pit_id
for item in ["index", "routing", "preference"]:
body.pop(item, None)
index = None
# explicitly convert to int to provoke an error otherwise
total_pages = sys.maxsize if params.get("pages") == "all" else int(mandatory(params, "pages", self))
for page in range(1, total_pages + 1):
if pit_op:
pit_id = CompositeContext.get(pit_op)
body["pit"] = {"id": pit_id, "keep_alive": "1m"}
response = await self._raw_search(es, doc_type=None, index=index, body=body.copy(), params=request_params, headers=headers)
parsed, last_sort = self._search_after_extractor(
response,
bool(pit_op),
results.get("hits"), # type: ignore[arg-type] # TODO remove the below ignore when introducing type hints
)
results["pages"] = page
results["weight"] = page
if results.get("hits") is None:
results["hits"] = parsed.get("hits.total.value")
results["hits_relation"] = parsed.get("hits.total.relation")
results["took"] += parsed.get("took")
# when this evaluates to True, keep it for the final result
if not results["timed_out"]:
results["timed_out"] = parsed.get("timed_out")
if pit_op:
# per the documentation the response pit id is most up-to-date
CompositeContext.put(pit_op, parsed.get("pit_id"))
if results.get("hits") / size > page:
body["search_after"] = last_sort
else:
# body needs to be un-mutated for the next iteration (preferring to do this over a deepcopy at the start)
for item in ["pit", "search_after"]:
body.pop(item, None)
break
return results
async def _composite_agg(es, params):
index = params.get("index", "_all")
pit_op = params.get("with-point-in-time-from")
results = {
"unit": "pages",
"success": True,
"timed_out": False,
"took": 0,
}
if pit_op:
# these are disallowed as they are encoded in the pit_id
for item in ["index", "routing", "preference"]:
body.pop(item, None)
index = None