From 7051bbd24890978028d677c949faa0965f75ab77 Mon Sep 17 00:00:00 2001 From: Li Wan <49334982+wanliAlex@users.noreply.github.com> Date: Thu, 6 Jul 2023 12:22:40 +1000 Subject: [PATCH] remove server batch (#113) --- src/marqo/index.py | 33 ++++----------- tests/v0_tests/test_add_documents.py | 63 ++++++++++++---------------- tests/v0_tests/test_logging.py | 2 - 3 files changed, 35 insertions(+), 63 deletions(-) diff --git a/src/marqo/index.py b/src/marqo/index.py index 77e074b4..05409442 100644 --- a/src/marqo/index.py +++ b/src/marqo/index.py @@ -239,8 +239,7 @@ def get_documents(self, document_ids: List[str], expose_facets=None) -> Dict[str def add_documents( self, documents: List[Dict[str, Any]], - auto_refresh=True, - server_batch_size: int = None, + auto_refresh: bool = True, client_batch_size: int = None, device: str = None, non_tensor_fields: List[str] = None, @@ -257,9 +256,6 @@ def add_documents( auto_refresh: Automatically refresh the index. If you are making lots of requests, it is advised to set this to False to increase performance. - server_batch_size: if it is set, documents will be indexed into batches - on the server as they are indexed. Otherwise documents are unbatched - server-side. client_batch_size: if it is set, documents will be indexed into batches in the client, before being sent off. Otherwise documents are unbatched client-side. @@ -279,7 +275,7 @@ def add_documents( if image_download_headers is None: image_download_headers = dict() return self._add_docs_organiser( - documents=documents, auto_refresh=auto_refresh, server_batch_size=server_batch_size, + documents=documents, auto_refresh=auto_refresh, client_batch_size=client_batch_size, device=device, non_tensor_fields=non_tensor_fields, use_existing_tensors=use_existing_tensors, image_download_headers=image_download_headers, mappings=mappings, model_auth=model_auth @@ -289,7 +285,6 @@ def _add_docs_organiser( self, documents: List[Dict[str, Any]], auto_refresh=True, - server_batch_size: int = None, client_batch_size: int = None, device: str = None, non_tensor_fields: List = None, @@ -317,7 +312,6 @@ def _add_docs_organiser( mappings_param = (utils.convert_dict_to_url_params(mappings) if mappings else '') query_str_params = ( f"{f'&device={utils.translate_device_string_for_url(device)}' if device is not None else ''}" - f"{f'&batch_size={server_batch_size}' if server_batch_size is not None else ''}" f"{f'&use_existing_tensors={str(use_existing_tensors).lower()}' if use_existing_tensors is not None else ''}" f"{f'&{non_tensor_fields_query_param}' if len(non_tensor_fields) > 0 else ''}" f"{f'&image_download_headers={image_download_headers_param}' if image_download_headers else ''}" @@ -354,23 +348,12 @@ def _add_docs_organiser( mq_logger.debug(f"add_documents roundtrip: took {(total_client_request_time):.3f}s to send {num_docs} " f"docs to Marqo (roundtrip, unbatched), for an average of {(total_client_request_time / num_docs):.3f}s per doc.") errors_detected = False - if server_batch_size is not None: - # with Server Batching (show processing time for each batch) - mq_logger.debug(f"add_documents Marqo index (server-side batch reports): ") - for batch in range(len(res)): - server_batch_result_count = len(res[batch]["items"]) - mq_logger.debug(f" marqo server batch {batch}: " - f"processed {server_batch_result_count} docs in {(res[batch]['processingTimeMs'] / 1000):.3f}s, " - f"for an average of {(res[batch]['processingTimeMs'] / (1000 * server_batch_result_count)):.3f}s per doc.") - if 'errors' in res[batch] and res[batch]['errors']: - errors_detected = True - else: - # no Server Batching - if 'processingTimeMs' in res: # Only outputs log if response is non-empty - mq_logger.debug(f"add_documents Marqo index: took {(res['processingTimeMs'] / 1000):.3f}s for Marqo to process & index {num_docs} " - f"docs (server unbatched), for an average of {(res['processingTimeMs'] / (1000 * num_docs)):.3f}s per doc.") - if 'errors' in res and res['errors']: - mq_logger.info(error_detected_message) + + if 'processingTimeMs' in res: # Only outputs log if response is non-empty + mq_logger.debug(f"add_documents Marqo index: took {(res['processingTimeMs'] / 1000):.3f}s for Marqo to process & index {num_docs} " + f"docs (server unbatched), for an average of {(res['processingTimeMs'] / (1000 * num_docs)):.3f}s per doc.") + if 'errors' in res and res['errors']: + mq_logger.info(error_detected_message) if errors_detected: mq_logger.info(error_detected_message) diff --git a/tests/v0_tests/test_add_documents.py b/tests/v0_tests/test_add_documents.py index 7ac21e38..e57240cd 100644 --- a/tests/v0_tests/test_add_documents.py +++ b/tests/v0_tests/test_add_documents.py @@ -158,7 +158,7 @@ def test_add_batched_documents(self): "_id": doc_id} for doc_id in doc_ids] assert len(docs) == 100 - ix.add_documents(docs, server_batch_size=5, client_batch_size=4) + ix.add_documents(docs, client_batch_size=4) ix.refresh() # takes too long to search for all... for _id in [0, 19, 20, 99]: @@ -343,41 +343,32 @@ def test_batching_add_docs(self): batches = [None, 1, 2, 50] for auto_refresh in (None, True, False): for client_batch_size in batches: - for server_batch_size in batches: - mock__post = mock.MagicMock() - mock__post.return_value = dict() - @mock.patch("marqo._httprequests.HttpRequests.post", mock__post) - def run(): - res = self.client.index(self.index_name_1).add_documents( - auto_refresh=auto_refresh, documents=docs, client_batch_size=client_batch_size, - server_batch_size=server_batch_size) - if client_batch_size is not None: - assert isinstance(res, list) - assert len(res) == math.ceil(docs_to_add/client_batch_size) - # should only refresh on the last call, if auto_refresh=True - assert all([f'refresh=false' in d[1]['path'] for d in - mock__post.call_args_list][:-1]) - if auto_refresh: - assert [f"{self.index_name_1}/refresh" in d[1]['path'] - for d in mock__post.call_args_list][-1] - else: - assert isinstance(res, dict) - # One huge request is made, if there is no client_side_batching: - assert all([len(d[1]['body']) == docs_to_add for d in mock__post.call_args_list]) - if server_batch_size is not None: - if auto_refresh: - assert all([f'batch_size={server_batch_size}' in d[1]['path'] for d in - mock__post.call_args_list][:-1]) - else: - assert all([f'batch_size={server_batch_size}' in d[1]['path'] - for d in mock__post.call_args_list]) - else: - assert all(['batch' not in d[1]['path'] for d in mock__post.call_args_list]) - - assert all(['processes' not in d[1]['path'] for d in mock__post.call_args_list]) - - return True - assert run() + mock__post = mock.MagicMock() + mock__post.return_value = dict() + @mock.patch("marqo._httprequests.HttpRequests.post", mock__post) + def run(): + res = self.client.index(self.index_name_1).add_documents( + auto_refresh=auto_refresh, documents=docs, client_batch_size=client_batch_size,) + if client_batch_size is not None: + assert isinstance(res, list) + assert len(res) == math.ceil(docs_to_add/client_batch_size) + # should only refresh on the last call, if auto_refresh=True + assert all([f'refresh=false' in d[1]['path'] for d in + mock__post.call_args_list][:-1]) + if auto_refresh: + assert [f"{self.index_name_1}/refresh" in d[1]['path'] + for d in mock__post.call_args_list][-1] + else: + assert isinstance(res, dict) + # One huge request is made, if there is no client_side_batching: + assert all([len(d[1]['body']) == docs_to_add for d in mock__post.call_args_list]) + + assert all(['batch' not in d[1]['path'] for d in mock__post.call_args_list]) + + assert all(['processes' not in d[1]['path'] for d in mock__post.call_args_list]) + + return True + assert run() def test_add_lists_non_tensor(self): original_doc = {"d1": "blah", "_id": "1234", 'my list': ['tag-1', 'tag-2']} diff --git a/tests/v0_tests/test_logging.py b/tests/v0_tests/test_logging.py index dd704786..6dcaec38 100644 --- a/tests/v0_tests/test_logging.py +++ b/tests/v0_tests/test_logging.py @@ -71,11 +71,9 @@ def test_add_document_warnings_client_batching(self): params_expected = [ # so no client batching, that means no batch info output, and therefore only 1 warning ({}, {"num_log_msgs": 1, "num_errors_msgs": 1}), - ({'server_batch_size': 5}, {"num_log_msgs": 1, "num_errors_msgs": 1}), # one error message, one regular info message per client batch ({"client_batch_size": 5}, {"num_log_msgs": 6, "num_errors_msgs": 2}), - ({"client_batch_size": 10, 'server_batch_size': 5}, {"num_log_msgs": 4, "num_errors_msgs": 2}), ] for params, expected in params_expected: