Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up client-side bulk-handling #890

Merged
merged 2 commits into from
Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions benchmarks/track/bulk_params_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import sys

import pytest

from esrally.track import params


class StaticSource:
def __init__(self, contents, mode, encoding="utf-8"):
self.contents = '{"geonameid": 2986043, "name": "Pic de Font Blanca", "asciiname": "Pic de Font Blanca"}'
self.current_index = 0
self.opened = False

def open(self):
self.opened = True
return self

def seek(self, offset):
pass

def read(self):
return "\n".join(self.contents)

def readline(self):
return self.contents

def readlines(self, num_lines):
return [self.contents] * num_lines

def close(self):
self._assert_opened()
self.contents = None
self.opened = False

def _assert_opened(self):
assert self.opened

def __enter__(self):
self.open()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False

def __str__(self, *args, **kwargs):
return "StaticSource"


def create_reader(bulk_size):
metadata = params.GenerateActionMetaData(index_name="test-idx", type_name=None)

source = params.Slice(StaticSource, 0, sys.maxsize)
reader = params.MetadataIndexDataReader(data_file="bogus",
batch_size=bulk_size,
bulk_size=bulk_size,
file_source=source,
action_metadata=metadata,
index_name="test-idx",
type_name=None)
return reader


@pytest.mark.benchmark(
group="bulk-params",
warmup=True,
warmup_iterations=10000,
disable_gc=True,
)
def test_index_data_reader_100(benchmark):
reader = create_reader(bulk_size=100)
reader.__enter__()
benchmark(reader.__next__)
reader.__exit__(None, None, None)


@pytest.mark.benchmark(
group="bulk-params",
warmup=True,
warmup_iterations=10000,
disable_gc=True,
)
def test_index_data_reader_1000(benchmark):
reader = create_reader(bulk_size=1000)
reader.__enter__()
benchmark(reader.__next__)
reader.__exit__(None, None, None)


@pytest.mark.benchmark(
group="bulk-params",
warmup=True,
warmup_iterations=10000,
disable_gc=True,
)
def test_index_data_reader_10000(benchmark):
reader = create_reader(bulk_size=10000)
reader.__enter__()
benchmark(reader.__next__)
reader.__exit__(None, None, None)


@pytest.mark.benchmark(
group="bulk-params",
warmup=True,
warmup_iterations=10000,
disable_gc=True,
)
def test_index_data_reader_100000(benchmark):
reader = create_reader(bulk_size=100000)
reader.__enter__()
benchmark(reader.__next__)
reader.__exit__(None, None, None)
9 changes: 8 additions & 1 deletion esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,14 @@ def detailed_stats(self, params, bulk_size, response):
total_document_size_bytes = 0
with_action_metadata = mandatory(params, "action-metadata-present", self)

for line_number, data in enumerate(params["body"]):
if isinstance(params["body"], str):
bulk_lines = params["body"].split("\n")
elif isinstance(params["body"], list):
bulk_lines = params["body"]
else:
raise exceptions.DataError("bulk body is neither string nor list")

for line_number, data in enumerate(bulk_lines):
line_size = len(data.encode('utf-8'))
if with_action_metadata:
if line_number % 2 == 1:
Expand Down
145 changes: 95 additions & 50 deletions esrally/track/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def params(self):
# self.internal_params always reads all files. This is necessary to ensure we terminate early in case
# the user has specified ingest percentage.
if self.current_bulk == self.total_bulks:
raise StopIteration
raise StopIteration()
self.current_bulk += 1
return next(self.internal_params)

Expand Down Expand Up @@ -647,13 +647,12 @@ def create_default_reader(docs, offset, num_lines, num_docs, batch_size, bulk_si
source = Slice(io.FileSource, offset, num_lines)

if docs.includes_action_and_meta_data:
am_handler = SourceActionMetaData(source)
return SourceOnlyIndexDataReader(docs.document_file, batch_size, bulk_size, source, docs.target_index, docs.target_type)
else:
am_handler = GenerateActionMetaData(docs.target_index, docs.target_type,
build_conflicting_ids(id_conflicts, num_docs, offset), conflict_probability,
on_conflict, recency)

return IndexDataReader(docs.document_file, batch_size, bulk_size, source, am_handler, docs.target_index, docs.target_type)
return MetadataIndexDataReader(docs.document_file, batch_size, bulk_size, source, am_handler, docs.target_index, docs.target_type)


def create_readers(num_clients, client_index, corpora, batch_size, bulk_size, id_conflicts, conflict_probability, on_conflict, recency,
Expand Down Expand Up @@ -758,15 +757,15 @@ class GenerateActionMetaData:
def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probability=None, on_conflict=None,
recency=None, rand=random.random, randint=random.randint, randexp=random.expovariate):
if type_name:
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_type": "%s", "_id": "%s"}}' % \
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_type": "%s", "_id": "%s"}}\n' % \
(index_name, type_name, "%s")
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_type": "%s", "_id": "%s"}}' % \
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_type": "%s", "_id": "%s"}}\n' % \
(index_name, type_name, "%s")
self.meta_data_index_no_id = '{"index": {"_index": "%s", "_type": "%s"}}' % (index_name, type_name)
self.meta_data_index_no_id = '{"index": {"_index": "%s", "_type": "%s"}}\n' % (index_name, type_name)
else:
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_id": "%s"}}' % (index_name, "%s")
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_id": "%s"}}' % (index_name, "%s")
self.meta_data_index_no_id = '{"index": {"_index": "%s"}}' % index_name
self.meta_data_index_with_id = '{"index": {"_index": "%s", "_id": "%s"}}\n' % (index_name, "%s")
self.meta_data_update_with_id = '{"update": {"_index": "%s", "_id": "%s"}}\n' % (index_name, "%s")
self.meta_data_index_no_id = '{"index": {"_index": "%s"}}\n' % index_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this might become something to fix when we enable C4001 in pylintrc; should we # pylint: disable=invalid-string-quote right after __init__ (I found a ref here).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we deal with this when we enable C4001?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine.


self.conflicting_ids = conflicting_ids
self.on_conflict = on_conflict
Expand All @@ -779,6 +778,13 @@ def __init__(self, index_name, type_name, conflicting_ids=None, conflict_probabi
self.randexp = randexp
self.id_up_to = 0

@property
def is_constant(self):
"""
:return: True iff the iterator will always return the same value.
"""
return self.conflicting_ids is None

def __iter__(self):
return self

Expand Down Expand Up @@ -818,34 +824,25 @@ def __next__(self):
return "index", self.meta_data_index_no_id


class SourceActionMetaData:
def __init__(self, source):
self.source = source

def __iter__(self):
return self

def __next__(self):
return "source", next(self.source)


class Slice:
def __init__(self, source_class, offset, number_of_lines):
self.source_class = source_class
self.source = None
self.offset = offset
self.number_of_lines = number_of_lines
self.current_line = 0
self.bulk_size = None
self.logger = logging.getLogger(__name__)

def open(self, file_name, mode):
logger = logging.getLogger(__name__)
def open(self, file_name, mode, bulk_size):
self.bulk_size = bulk_size
self.source = self.source_class(file_name, mode).open()
# skip offset number of lines
logger.info("Skipping %d lines in [%s].", self.offset, file_name)
self.logger.info("Will read [%d] lines from [%s] starting from line [%d] with bulk size [%d].",
self.number_of_lines, file_name, self.offset, self.bulk_size)
start = time.perf_counter()
io.skip_lines(file_name, self.source, self.offset)
end = time.perf_counter()
logger.info("Skipping %d lines took %f s.", self.offset, end - start)
self.logger.debug("Skipping [%d] lines took [%f] s.", self.offset, end - start)
return self

def close(self):
Expand All @@ -859,11 +856,12 @@ def __next__(self):
if self.current_line >= self.number_of_lines:
raise StopIteration()
else:
self.current_line += 1
line = self.source.readline()
if len(line) == 0:
# ensure we don't read past the allowed number of lines.
lines = self.source.readlines(min(self.bulk_size, self.number_of_lines - self.current_line))
self.current_line += len(lines)
if len(lines) == 0:
raise StopIteration()
return line.strip()
return lines

def __str__(self):
return "%s[%d;%d]" % (self.source, self.offset, self.offset + self.number_of_lines)
Expand All @@ -873,21 +871,20 @@ class IndexDataReader:
"""
Reads a file in bulks into an array and also adds a meta-data line before each document if necessary.
This implementation also supports batching. This means that you can specify batch_size = N * bulk_size, where N is any natural
number >= 1. This makes file reading more efficient for small bulk sizes.
This implementation also supports batching. This means that you can specify batch_size = N * bulk_size, where N
is any natural number >= 1. This makes file reading more efficient for small bulk sizes.
"""

def __init__(self, data_file, batch_size, bulk_size, file_source, action_metadata, index_name, type_name):
def __init__(self, data_file, batch_size, bulk_size, file_source, index_name, type_name):
self.data_file = data_file
self.batch_size = batch_size
self.bulk_size = bulk_size
self.file_source = file_source
self.action_metadata = action_metadata
self.index_name = index_name
self.type_name = type_name

def __enter__(self):
self.file_source.open(self.data_file, 'rt')
self.file_source.open(self.data_file, "rt", self.bulk_size)
return self

def __iter__(self):
Expand All @@ -901,38 +898,86 @@ def __next__(self):
try:
docs_in_batch = 0
while docs_in_batch < self.batch_size:
docs_in_bulk, bulk = self.read_bulk()
try:
docs_in_bulk, bulk = self.read_bulk()
except StopIteration:
break
if docs_in_bulk == 0:
break
docs_in_batch += docs_in_bulk
batch.append((docs_in_bulk, bulk))
batch.append((docs_in_bulk, "".join(bulk)))
if docs_in_batch == 0:
raise StopIteration()
return self.index_name, self.type_name, batch
except IOError:
logging.getLogger(__name__).exception("Could not read [%s]", self.data_file)

def read_bulk(self):
docs_in_bulk = 0
def __exit__(self, exc_type, exc_val, exc_tb):
self.file_source.close()
return False


class MetadataIndexDataReader(IndexDataReader):
def __init__(self, data_file, batch_size, bulk_size, file_source, action_metadata, index_name, type_name):
super().__init__(data_file, batch_size, bulk_size, file_source, index_name, type_name)
self.action_metadata = action_metadata
self.action_metadata_line = None

def __enter__(self):
super().__enter__()
if self.action_metadata.is_constant:
_, self.action_metadata_line = next(self.action_metadata)
self.read_bulk = self._read_bulk_fast
else:
self.read_bulk = self._read_bulk_regular
return self

def _read_bulk_fast(self):
"""
Special-case implementation for bulk data files where the action and meta-data line is always identical.
"""
current_bulk = []
for action_metadata_item, document in zip(self.action_metadata, self.file_source):
# hoist
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly I had to look this up as my understanding this is mostly JS terminology whereas we in Python we talk about block scoping.

But what does this comment refer at example? Which variable, in which block, is getting hoisted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that term from JVM optimizations. In any case we ensure that the field access is turned into a local variable access because it is used in the loop on the hot code path and this is what I was referring to here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. For a person like me it doesn't add any additional info (seems obvious from the implementation) but if it's valuable to someone else, that's fine.

action_metadata_line = self.action_metadata_line
docs = next(self.file_source)

for doc in docs:
current_bulk.append(action_metadata_line)
current_bulk.append(doc)
return len(docs), current_bulk

def _read_bulk_regular(self):
"""
General case implementation for bulk files. This implementation can cover all cases but is slower when the
action and meta-data line is always identical.
"""
current_bulk = []
docs = next(self.file_source)
for doc in docs:
action_metadata_item = next(self.action_metadata)
if action_metadata_item:
action_type, action_metadata_line = action_metadata_item
current_bulk.append(action_metadata_line)
if action_type == "update":
current_bulk.append("{\"doc\":%s}" % document)
# remove the trailing "\n" as the doc needs to fit on one line
doc = doc.strip()
current_bulk.append("{\"doc\":%s}\n" % doc)
else:
current_bulk.append(document)
current_bulk.append(doc)
else:
current_bulk.append(document)
docs_in_bulk += 1
if docs_in_bulk == self.bulk_size:
break
return docs_in_bulk, current_bulk
current_bulk.append(doc)
return len(docs), current_bulk

def __exit__(self, exc_type, exc_val, exc_tb):
self.file_source.close()
return False

class SourceOnlyIndexDataReader(IndexDataReader):
def __init__(self, data_file, batch_size, bulk_size, file_source, index_name, type_name):
# keep batch size as it only considers documents read, not lines read but increase the bulk size as
# documents are only on every other line.
super().__init__(data_file, batch_size, bulk_size * 2, file_source, index_name, type_name)

def read_bulk(self):
bulk_items = next(self.file_source)
return len(bulk_items) // 2, bulk_items


register_param_source_for_operation(track.OperationType.Bulk, BulkIndexParamSource)
Expand Down
Loading