Skip to content

Commit

Permalink
remove server batch (#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanliAlex authored Jul 6, 2023
1 parent b8b827b commit 7051bbd
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 63 deletions.
33 changes: 8 additions & 25 deletions src/marqo/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ def get_documents(self, document_ids: List[str], expose_facets=None) -> Dict[str
def add_documents(
self,
documents: List[Dict[str, Any]],
auto_refresh=True,
server_batch_size: int = None,
auto_refresh: bool = True,
client_batch_size: int = None,
device: str = None,
non_tensor_fields: List[str] = None,
Expand All @@ -257,9 +256,6 @@ def add_documents(
auto_refresh: Automatically refresh the index. If you are making
lots of requests, it is advised to set this to False to
increase performance.
server_batch_size: if it is set, documents will be indexed into batches
on the server as they are indexed. Otherwise documents are unbatched
server-side.
client_batch_size: if it is set, documents will be indexed into batches
in the client, before being sent off. Otherwise documents are unbatched
client-side.
Expand All @@ -279,7 +275,7 @@ def add_documents(
if image_download_headers is None:
image_download_headers = dict()
return self._add_docs_organiser(
documents=documents, auto_refresh=auto_refresh, server_batch_size=server_batch_size,
documents=documents, auto_refresh=auto_refresh,
client_batch_size=client_batch_size, device=device, non_tensor_fields=non_tensor_fields,
use_existing_tensors=use_existing_tensors, image_download_headers=image_download_headers, mappings=mappings,
model_auth=model_auth
Expand All @@ -289,7 +285,6 @@ def _add_docs_organiser(
self,
documents: List[Dict[str, Any]],
auto_refresh=True,
server_batch_size: int = None,
client_batch_size: int = None,
device: str = None,
non_tensor_fields: List = None,
Expand Down Expand Up @@ -317,7 +312,6 @@ def _add_docs_organiser(
mappings_param = (utils.convert_dict_to_url_params(mappings) if mappings else '')
query_str_params = (
f"{f'&device={utils.translate_device_string_for_url(device)}' if device is not None else ''}"
f"{f'&batch_size={server_batch_size}' if server_batch_size is not None else ''}"
f"{f'&use_existing_tensors={str(use_existing_tensors).lower()}' if use_existing_tensors is not None else ''}"
f"{f'&{non_tensor_fields_query_param}' if len(non_tensor_fields) > 0 else ''}"
f"{f'&image_download_headers={image_download_headers_param}' if image_download_headers else ''}"
Expand Down Expand Up @@ -354,23 +348,12 @@ def _add_docs_organiser(
mq_logger.debug(f"add_documents roundtrip: took {(total_client_request_time):.3f}s to send {num_docs} "
f"docs to Marqo (roundtrip, unbatched), for an average of {(total_client_request_time / num_docs):.3f}s per doc.")
errors_detected = False
if server_batch_size is not None:
# with Server Batching (show processing time for each batch)
mq_logger.debug(f"add_documents Marqo index (server-side batch reports): ")
for batch in range(len(res)):
server_batch_result_count = len(res[batch]["items"])
mq_logger.debug(f" marqo server batch {batch}: "
f"processed {server_batch_result_count} docs in {(res[batch]['processingTimeMs'] / 1000):.3f}s, "
f"for an average of {(res[batch]['processingTimeMs'] / (1000 * server_batch_result_count)):.3f}s per doc.")
if 'errors' in res[batch] and res[batch]['errors']:
errors_detected = True
else:
# no Server Batching
if 'processingTimeMs' in res: # Only outputs log if response is non-empty
mq_logger.debug(f"add_documents Marqo index: took {(res['processingTimeMs'] / 1000):.3f}s for Marqo to process & index {num_docs} "
f"docs (server unbatched), for an average of {(res['processingTimeMs'] / (1000 * num_docs)):.3f}s per doc.")
if 'errors' in res and res['errors']:
mq_logger.info(error_detected_message)

if 'processingTimeMs' in res: # Only outputs log if response is non-empty
mq_logger.debug(f"add_documents Marqo index: took {(res['processingTimeMs'] / 1000):.3f}s for Marqo to process & index {num_docs} "
f"docs (server unbatched), for an average of {(res['processingTimeMs'] / (1000 * num_docs)):.3f}s per doc.")
if 'errors' in res and res['errors']:
mq_logger.info(error_detected_message)

if errors_detected:
mq_logger.info(error_detected_message)
Expand Down
63 changes: 27 additions & 36 deletions tests/v0_tests/test_add_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def test_add_batched_documents(self):
"_id": doc_id}
for doc_id in doc_ids]
assert len(docs) == 100
ix.add_documents(docs, server_batch_size=5, client_batch_size=4)
ix.add_documents(docs, client_batch_size=4)
ix.refresh()
# takes too long to search for all...
for _id in [0, 19, 20, 99]:
Expand Down Expand Up @@ -343,41 +343,32 @@ def test_batching_add_docs(self):
batches = [None, 1, 2, 50]
for auto_refresh in (None, True, False):
for client_batch_size in batches:
for server_batch_size in batches:
mock__post = mock.MagicMock()
mock__post.return_value = dict()
@mock.patch("marqo._httprequests.HttpRequests.post", mock__post)
def run():
res = self.client.index(self.index_name_1).add_documents(
auto_refresh=auto_refresh, documents=docs, client_batch_size=client_batch_size,
server_batch_size=server_batch_size)
if client_batch_size is not None:
assert isinstance(res, list)
assert len(res) == math.ceil(docs_to_add/client_batch_size)
# should only refresh on the last call, if auto_refresh=True
assert all([f'refresh=false' in d[1]['path'] for d in
mock__post.call_args_list][:-1])
if auto_refresh:
assert [f"{self.index_name_1}/refresh" in d[1]['path']
for d in mock__post.call_args_list][-1]
else:
assert isinstance(res, dict)
# One huge request is made, if there is no client_side_batching:
assert all([len(d[1]['body']) == docs_to_add for d in mock__post.call_args_list])
if server_batch_size is not None:
if auto_refresh:
assert all([f'batch_size={server_batch_size}' in d[1]['path'] for d in
mock__post.call_args_list][:-1])
else:
assert all([f'batch_size={server_batch_size}' in d[1]['path']
for d in mock__post.call_args_list])
else:
assert all(['batch' not in d[1]['path'] for d in mock__post.call_args_list])

assert all(['processes' not in d[1]['path'] for d in mock__post.call_args_list])

return True
assert run()
mock__post = mock.MagicMock()
mock__post.return_value = dict()
@mock.patch("marqo._httprequests.HttpRequests.post", mock__post)
def run():
res = self.client.index(self.index_name_1).add_documents(
auto_refresh=auto_refresh, documents=docs, client_batch_size=client_batch_size,)
if client_batch_size is not None:
assert isinstance(res, list)
assert len(res) == math.ceil(docs_to_add/client_batch_size)
# should only refresh on the last call, if auto_refresh=True
assert all([f'refresh=false' in d[1]['path'] for d in
mock__post.call_args_list][:-1])
if auto_refresh:
assert [f"{self.index_name_1}/refresh" in d[1]['path']
for d in mock__post.call_args_list][-1]
else:
assert isinstance(res, dict)
# One huge request is made, if there is no client_side_batching:
assert all([len(d[1]['body']) == docs_to_add for d in mock__post.call_args_list])

assert all(['batch' not in d[1]['path'] for d in mock__post.call_args_list])

assert all(['processes' not in d[1]['path'] for d in mock__post.call_args_list])

return True
assert run()

def test_add_lists_non_tensor(self):
original_doc = {"d1": "blah", "_id": "1234", 'my list': ['tag-1', 'tag-2']}
Expand Down
2 changes: 0 additions & 2 deletions tests/v0_tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,9 @@ def test_add_document_warnings_client_batching(self):
params_expected = [
# so no client batching, that means no batch info output, and therefore only 1 warning
({}, {"num_log_msgs": 1, "num_errors_msgs": 1}),
({'server_batch_size': 5}, {"num_log_msgs": 1, "num_errors_msgs": 1}),

# one error message, one regular info message per client batch
({"client_batch_size": 5}, {"num_log_msgs": 6, "num_errors_msgs": 2}),
({"client_batch_size": 10, 'server_batch_size': 5}, {"num_log_msgs": 4, "num_errors_msgs": 2}),
]
for params, expected in params_expected:

Expand Down

0 comments on commit 7051bbd

Please sign in to comment.