-
Notifications
You must be signed in to change notification settings - Fork 314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Speed up client-side bulk-handling #890
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
# Licensed to Elasticsearch B.V. under one or more contributor | ||
# license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright | ||
# ownership. Elasticsearch B.V. licenses this file to you under | ||
# the Apache License, Version 2.0 (the "License"); you may | ||
# not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
import sys | ||
|
||
import pytest | ||
|
||
from esrally.track import params | ||
|
||
|
||
class StaticSource: | ||
def __init__(self, contents, mode, encoding="utf-8"): | ||
self.contents = '{"geonameid": 2986043, "name": "Pic de Font Blanca", "asciiname": "Pic de Font Blanca"}' | ||
self.current_index = 0 | ||
self.opened = False | ||
|
||
def open(self): | ||
self.opened = True | ||
return self | ||
|
||
def seek(self, offset): | ||
pass | ||
|
||
def read(self): | ||
return "\n".join(self.contents) | ||
|
||
def readline(self): | ||
return self.contents | ||
|
||
def readlines(self, num_lines): | ||
return [self.contents] * num_lines | ||
|
||
def close(self): | ||
self._assert_opened() | ||
self.contents = None | ||
self.opened = False | ||
|
||
def _assert_opened(self): | ||
assert self.opened | ||
|
||
def __enter__(self): | ||
self.open() | ||
return self | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
self.close() | ||
return False | ||
|
||
def __str__(self, *args, **kwargs): | ||
return "StaticSource" | ||
|
||
|
||
def create_reader(bulk_size): | ||
metadata = params.GenerateActionMetaData(index_name="test-idx", type_name=None) | ||
|
||
source = params.Slice(StaticSource, 0, sys.maxsize) | ||
reader = params.MetadataIndexDataReader(data_file="bogus", | ||
batch_size=bulk_size, | ||
bulk_size=bulk_size, | ||
file_source=source, | ||
action_metadata=metadata, | ||
index_name="test-idx", | ||
type_name=None) | ||
return reader | ||
|
||
|
||
@pytest.mark.benchmark( | ||
group="bulk-params", | ||
warmup=True, | ||
warmup_iterations=10000, | ||
disable_gc=True, | ||
) | ||
def test_index_data_reader_100(benchmark): | ||
reader = create_reader(bulk_size=100) | ||
reader.__enter__() | ||
benchmark(reader.__next__) | ||
reader.__exit__(None, None, None) | ||
|
||
|
||
@pytest.mark.benchmark( | ||
group="bulk-params", | ||
warmup=True, | ||
warmup_iterations=10000, | ||
disable_gc=True, | ||
) | ||
def test_index_data_reader_1000(benchmark): | ||
reader = create_reader(bulk_size=1000) | ||
reader.__enter__() | ||
benchmark(reader.__next__) | ||
reader.__exit__(None, None, None) | ||
|
||
|
||
@pytest.mark.benchmark( | ||
group="bulk-params", | ||
warmup=True, | ||
warmup_iterations=10000, | ||
disable_gc=True, | ||
) | ||
def test_index_data_reader_10000(benchmark): | ||
reader = create_reader(bulk_size=10000) | ||
reader.__enter__() | ||
benchmark(reader.__next__) | ||
reader.__exit__(None, None, None) | ||
|
||
|
||
@pytest.mark.benchmark( | ||
group="bulk-params", | ||
warmup=True, | ||
warmup_iterations=10000, | ||
disable_gc=True, | ||
) | ||
def test_index_data_reader_100000(benchmark): | ||
reader = create_reader(bulk_size=100000) | ||
reader.__enter__() | ||
benchmark(reader.__next__) | ||
reader.__exit__(None, None, None) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -571,7 +571,7 @@ def params(self): | |
# self.internal_params always reads all files. This is necessary to ensure we terminate early in case | ||
# the user has specified ingest percentage. | ||
if self.current_bulk == self.total_bulks: | ||
raise StopIteration | ||
raise StopIteration() | ||
self.current_bulk += 1 | ||
return next(self.internal_params) | ||
|
||
|
@@ -647,13 +647,12 @@ def create_default_reader(docs, offset, num_lines, num_docs, batch_size, bulk_si | |
source = Slice(io.FileSource, offset, num_lines) | ||
|
||
if docs.includes_action_and_meta_data: | ||
am_handler = SourceActionMetaData(source) | ||
return SourceOnlyIndexDataReader(docs.document_file, batch_size, bulk_size, source, docs.target_index, docs.target_type) | ||
else: | ||
am_handler = GenerateActionMetaData(docs.target_index, docs.target_type, | ||
build_conflicting_ids(id_conflicts, num_docs, offset), conflict_probability, | ||
on_conflict, recency) | ||
|
||
return IndexDataReader(docs.document_file, batch_size, bulk_size, source, am_handler, docs.target_index, docs.target_type) | ||
return MetadataIndexDataReader(docs.document_file, batch_size, bulk_size, source, am_handler, docs.target_index, docs.target_type) | ||
|
||
|
||
def create_readers(num_clients, client_index, corpora, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency, | ||
|
@@ -758,15 +757,15 @@ class GenerateActionMetaData: | |
def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probability=None, on_conflict=None, | ||
recency=None, rand=random.random, randint=random.randint, randexp=random.expovariate): | ||
if type_name: | ||
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_type": "%s", "_id": "%s"}}' % \ | ||
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_type": "%s", "_id": "%s"}}\n' % \ | ||
(index_name, type_name, "%s") | ||
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_type": "%s", "_id": "%s"}}' % \ | ||
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_type": "%s", "_id": "%s"}}\n' % \ | ||
(index_name, type_name, "%s") | ||
self.meta_data_index_no_id = '{"index": {"_index": "%s", "_type": "%s"}}' % (index_name, type_name) | ||
self.meta_data_index_no_id = '{"index": {"_index": "%s", "_type": "%s"}}\n' % (index_name, type_name) | ||
else: | ||
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_id": "%s"}}' % (index_name, "%s") | ||
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_id": "%s"}}' % (index_name, "%s") | ||
self.meta_data_index_no_id = '{"index": {"_index": "%s"}}' % index_name | ||
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_id": "%s"}}\n' % (index_name, "%s") | ||
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_id": "%s"}}\n' % (index_name, "%s") | ||
self.meta_data_index_no_id = '{"index": {"_index": "%s"}}\n' % index_name | ||
|
||
self.conflicting_ids = conflicting_ids | ||
self.on_conflict = on_conflict | ||
|
@@ -779,6 +778,13 @@ def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probabi | |
self.randexp = randexp | ||
self.id_up_to = 0 | ||
|
||
@property | ||
def is_constant(self): | ||
""" | ||
:return: True iff the iterator will always return the same value. | ||
""" | ||
return self.conflicting_ids is None | ||
|
||
def __iter__(self): | ||
return self | ||
|
||
|
@@ -818,34 +824,25 @@ def __next__(self): | |
return "index", self.meta_data_index_no_id | ||
|
||
|
||
class SourceActionMetaData: | ||
def __init__(self, source): | ||
self.source = source | ||
|
||
def __iter__(self): | ||
return self | ||
|
||
def __next__(self): | ||
return "source", next(self.source) | ||
|
||
|
||
class Slice: | ||
def __init__(self, source_class, offset, number_of_lines): | ||
self.source_class = source_class | ||
self.source = None | ||
self.offset = offset | ||
self.number_of_lines = number_of_lines | ||
self.current_line = 0 | ||
self.bulk_size = None | ||
self.logger = logging.getLogger(__name__) | ||
|
||
def open(self, file_name, mode): | ||
logger = logging.getLogger(__name__) | ||
def open(self, file_name, mode, bulk_size): | ||
self.bulk_size = bulk_size | ||
self.source = self.source_class(file_name, mode).open() | ||
# skip offset number of lines | ||
logger.info("Skipping %d lines in [%s].", self.offset, file_name) | ||
self.logger.info("Will read [%d] lines from [%s] starting from line [%d] with bulk size [%d].", | ||
self.number_of_lines, file_name, self.offset, self.bulk_size) | ||
start = time.perf_counter() | ||
io.skip_lines(file_name, self.source, self.offset) | ||
end = time.perf_counter() | ||
logger.info("Skipping %d lines took %f s.", self.offset, end - start) | ||
self.logger.debug("Skipping [%d] lines took [%f] s.", self.offset, end - start) | ||
return self | ||
|
||
def close(self): | ||
|
@@ -859,11 +856,12 @@ def __next__(self): | |
if self.current_line >= self.number_of_lines: | ||
raise StopIteration() | ||
else: | ||
self.current_line += 1 | ||
line = self.source.readline() | ||
if len(line) == 0: | ||
# ensure we don't read past the allowed number of lines. | ||
lines = self.source.readlines(min(self.bulk_size, self.number_of_lines - self.current_line)) | ||
self.current_line += len(lines) | ||
if len(lines) == 0: | ||
raise StopIteration() | ||
return line.strip() | ||
return lines | ||
|
||
def __str__(self): | ||
return "%s[%d;%d]" % (self.source, self.offset, self.offset + self.number_of_lines) | ||
|
@@ -873,21 +871,20 @@ class IndexDataReader: | |
""" | ||
Reads a file in bulks into an array and also adds a meta-data line before each document if necessary. | ||
This implementation also supports batching. This means that you can specify batch_size = N * bulk_size, where N is any natural | ||
number >= 1. This makes file reading more efficient for small bulk sizes. | ||
This implementation also supports batching. This means that you can specify batch_size = N * bulk_size, where N | ||
is any natural number >= 1. This makes file reading more efficient for small bulk sizes. | ||
""" | ||
|
||
def __init__(self, data_file, batch_size, bulk_size, file_source, action_metadata, index_name, type_name): | ||
def __init__(self, data_file, batch_size, bulk_size, file_source, index_name, type_name): | ||
self.data_file = data_file | ||
self.batch_size = batch_size | ||
self.bulk_size = bulk_size | ||
self.file_source = file_source | ||
self.action_metadata = action_metadata | ||
self.index_name = index_name | ||
self.type_name = type_name | ||
|
||
def __enter__(self): | ||
self.file_source.open(self.data_file, 'rt') | ||
self.file_source.open(self.data_file, "rt", self.bulk_size) | ||
return self | ||
|
||
def __iter__(self): | ||
|
@@ -901,38 +898,86 @@ def __next__(self): | |
try: | ||
docs_in_batch = 0 | ||
while docs_in_batch < self.batch_size: | ||
docs_in_bulk, bulk = self.read_bulk() | ||
try: | ||
docs_in_bulk, bulk = self.read_bulk() | ||
except StopIteration: | ||
break | ||
if docs_in_bulk == 0: | ||
break | ||
docs_in_batch += docs_in_bulk | ||
batch.append((docs_in_bulk, bulk)) | ||
batch.append((docs_in_bulk, "".join(bulk))) | ||
if docs_in_batch == 0: | ||
raise StopIteration() | ||
return self.index_name, self.type_name, batch | ||
except IOError: | ||
logging.getLogger(__name__).exception("Could not read [%s]", self.data_file) | ||
|
||
def read_bulk(self): | ||
docs_in_bulk = 0 | ||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
self.file_source.close() | ||
return False | ||
|
||
|
||
class MetadataIndexDataReader(IndexDataReader): | ||
def __init__(self, data_file, batch_size, bulk_size, file_source, action_metadata, index_name, type_name): | ||
super().__init__(data_file, batch_size, bulk_size, file_source, index_name, type_name) | ||
self.action_metadata = action_metadata | ||
self.action_metadata_line = None | ||
|
||
def __enter__(self): | ||
super().__enter__() | ||
if self.action_metadata.is_constant: | ||
_, self.action_metadata_line = next(self.action_metadata) | ||
self.read_bulk = self._read_bulk_fast | ||
else: | ||
self.read_bulk = self._read_bulk_regular | ||
return self | ||
|
||
def _read_bulk_fast(self): | ||
""" | ||
Special-case implementation for bulk data files where the action and meta-data line is always identical. | ||
""" | ||
current_bulk = [] | ||
for action_metadata_item, document in zip(self.action_metadata, self.file_source): | ||
# hoist | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Firstly I had to look this up as my understanding this is mostly JS terminology whereas we in Python we talk about block scoping. But what does this comment refer at example? Which variable, in which block, is getting hoisted? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know that term from JVM optimizations. In any case we ensure that the field access is turned into a local variable access because it is used in the loop on the hot code path and this is what I was referring to here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. For a person like me it doesn't add any additional info (seems obvious from the implementation) but if it's valuable to someone else, that's fine. |
||
action_metadata_line = self.action_metadata_line | ||
docs = next(self.file_source) | ||
|
||
for doc in docs: | ||
current_bulk.append(action_metadata_line) | ||
current_bulk.append(doc) | ||
return len(docs), current_bulk | ||
|
||
def _read_bulk_regular(self): | ||
""" | ||
General case implementation for bulk files. This implementation can cover all cases but is slower when the | ||
action and meta-data line is always identical. | ||
""" | ||
current_bulk = [] | ||
docs = next(self.file_source) | ||
for doc in docs: | ||
action_metadata_item = next(self.action_metadata) | ||
if action_metadata_item: | ||
action_type, action_metadata_line = action_metadata_item | ||
current_bulk.append(action_metadata_line) | ||
if action_type == "update": | ||
current_bulk.append("{\"doc\":%s}" % document) | ||
# remove the trailing "\n" as the doc needs to fit on one line | ||
doc = doc.strip() | ||
current_bulk.append("{\"doc\":%s}\n" % doc) | ||
else: | ||
current_bulk.append(document) | ||
current_bulk.append(doc) | ||
else: | ||
current_bulk.append(document) | ||
docs_in_bulk += 1 | ||
if docs_in_bulk == self.bulk_size: | ||
break | ||
return docs_in_bulk, current_bulk | ||
current_bulk.append(doc) | ||
return len(docs), current_bulk | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
self.file_source.close() | ||
return False | ||
|
||
class SourceOnlyIndexDataReader(IndexDataReader): | ||
def __init__(self, data_file, batch_size, bulk_size, file_source, index_name, type_name): | ||
# keep batch size as it only considers documents read, not lines read but increase the bulk size as | ||
# documents are only on every other line. | ||
super().__init__(data_file, batch_size, bulk_size * 2, file_source, index_name, type_name) | ||
|
||
def read_bulk(self): | ||
bulk_items = next(self.file_source) | ||
return len(bulk_items) // 2, bulk_items | ||
|
||
|
||
register_param_source_for_operation(track.OperationType.Bulk, BulkIndexParamSource) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this might become something to fix when we enable C4001 in pylintrc; should we
# pylint: disable=invalid-string-quote
right after__init__
(I found a ref here).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we deal with this when we enable
C4001
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine.