From e029e22305fd1f0cea1d1f8c00cd06dec3500d57 Mon Sep 17 00:00:00 2001 From: Renoy John Date: Wed, 27 Nov 2024 11:26:43 -0800 Subject: [PATCH] Use ThreadPoolExecutor to parallelize fetching of multi pages of items --- tabcmd/commands/server.py | 94 ++++++++++++++++++++++----------------- 1 file changed, 53 insertions(+), 41 deletions(-) diff --git a/tabcmd/commands/server.py b/tabcmd/commands/server.py index 46e77d01..744835ec 100644 --- a/tabcmd/commands/server.py +++ b/tabcmd/commands/server.py @@ -1,4 +1,5 @@ import os +import concurrent.futures from typing import List, Optional import tableauserverclient as TSC @@ -51,54 +52,65 @@ def get_items_by_name(logger, item_endpoint, item_name: str, container: Optional item_log_name = "{0}/{1}".format(container_name, item_log_name) logger.debug(_("export.status").format(item_log_name)) - result = [] - total_available_items = None - page_number = 1 - total_retrieved_items = 0 - - while True: - req_option = TSC.RequestOptions(pagenumber=page_number) - req_option.filter.add( - TSC.Filter(TSC.RequestOptions.Field.Name, TSC.RequestOptions.Operator.Equals, item_name) - ) - all_items, pagination_item = item_endpoint.get(req_option) - - if all_items is None or all_items == []: - raise TSC.ServerResponseError( - code="404", - summary=_("errors.xmlapi.not_found"), - detail=_("errors.xmlapi.not_found") + ": " + item_log_name, - ) - - if total_available_items is None: - total_available_items = pagination_item.total_available - - total_retrieved_items += len(all_items) - - logger.debug( - "{} items of name: {} were found for query page number: {}, page size: {} & total available: {}".format( - len(all_items), - item_name, - pagination_item.page_number, - pagination_item.page_size, - pagination_item.total_available, - ) + # Fetch the first page to determine the total number of pages + all_items, pagination_item = Server._fetch_items(logger, item_endpoint, item_name, 1) + if not all_items: + raise TSC.ServerResponseError( + code="404", + summary=_("errors.xmlapi.not_found"), + detail=_("errors.xmlapi.not_found") + ": " + item_log_name, ) - if container: - container_id = container.id - logger.debug("Filtering to items in project {}".format(container.id)) - result.extend(list(filter(lambda item: item.project_id == container_id, all_items))) - else: - result.extend(all_items) + total_available_items = pagination_item.total_available + page_size = pagination_item.page_size + total_pages = (total_available_items + page_size - 1) // page_size - if total_retrieved_items >= total_available_items: - break + result = Server._filter_items_by_container(logger, all_items, container) - page_number = pagination_item.page_number + 1 + # Use ThreadPoolExecutor to parallelize the fetching of remaining pages + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit(Server._fetch_and_filter_items, logger, item_endpoint, item_name, page, container) + for page in range(2, total_pages + 1) + ] + for future in concurrent.futures.as_completed(futures): + result.extend(future.result()) return result + @staticmethod + def _fetch_and_filter_items( + logger, item_endpoint, item_name: str, page_number: int, container: Optional[TSC.ProjectItem] + ): + all_items, _ = Server._fetch_items(logger, item_endpoint, item_name, page_number) + return Server._filter_items_by_container(logger, all_items, container) + + @staticmethod + def _fetch_items(logger, item_endpoint, item_name: str, page_number: int): + req_option = TSC.RequestOptions(pagenumber=page_number) + req_option.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name, TSC.RequestOptions.Operator.Equals, item_name)) + all_items, pagination_item = item_endpoint.get(req_option) + logger.debug( + "{} items of name: {} were found for query page number: {}, page size: {} & total available: {}".format( + len(all_items), + item_name, + pagination_item.page_number, + pagination_item.page_size, + pagination_item.total_available, + ) + ) + + return all_items, pagination_item + + @staticmethod + def _filter_items_by_container(logger, all_items, container: Optional[TSC.ProjectItem]) -> List: + if container: + container_id = container.id + logger.debug("Filtering to items in project {}".format(container.id)) + return list(filter(lambda item: item.project_id == container_id, all_items)) + + return all_items + # Get site by name or get currently logged in site @staticmethod def get_site_for_command_or_throw(logger, server, site_name):