Skip to content

Commit

Permalink
Use ThreadPoolExecutor to parallelize fetching of multi pages of items
Browse files Browse the repository at this point in the history
  • Loading branch information
renoyjohnm committed Nov 27, 2024
1 parent d6ad6bd commit e029e22
Showing 1 changed file with 53 additions and 41 deletions.
94 changes: 53 additions & 41 deletions tabcmd/commands/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import concurrent.futures
from typing import List, Optional

import tableauserverclient as TSC
Expand Down Expand Up @@ -51,54 +52,65 @@ def get_items_by_name(logger, item_endpoint, item_name: str, container: Optional
item_log_name = "{0}/{1}".format(container_name, item_log_name)
logger.debug(_("export.status").format(item_log_name))

result = []
total_available_items = None
page_number = 1
total_retrieved_items = 0

while True:
req_option = TSC.RequestOptions(pagenumber=page_number)
req_option.filter.add(
TSC.Filter(TSC.RequestOptions.Field.Name, TSC.RequestOptions.Operator.Equals, item_name)
)
all_items, pagination_item = item_endpoint.get(req_option)

if all_items is None or all_items == []:
raise TSC.ServerResponseError(
code="404",
summary=_("errors.xmlapi.not_found"),
detail=_("errors.xmlapi.not_found") + ": " + item_log_name,
)

if total_available_items is None:
total_available_items = pagination_item.total_available

total_retrieved_items += len(all_items)

logger.debug(
"{} items of name: {} were found for query page number: {}, page size: {} & total available: {}".format(
len(all_items),
item_name,
pagination_item.page_number,
pagination_item.page_size,
pagination_item.total_available,
)
# Fetch the first page to determine the total number of pages
all_items, pagination_item = Server._fetch_items(logger, item_endpoint, item_name, 1)
if not all_items:
raise TSC.ServerResponseError(
code="404",
summary=_("errors.xmlapi.not_found"),
detail=_("errors.xmlapi.not_found") + ": " + item_log_name,
)

if container:
container_id = container.id
logger.debug("Filtering to items in project {}".format(container.id))
result.extend(list(filter(lambda item: item.project_id == container_id, all_items)))
else:
result.extend(all_items)
total_available_items = pagination_item.total_available
page_size = pagination_item.page_size
total_pages = (total_available_items + page_size - 1) // page_size

if total_retrieved_items >= total_available_items:
break
result = Server._filter_items_by_container(logger, all_items, container)

page_number = pagination_item.page_number + 1
# Use ThreadPoolExecutor to parallelize the fetching of remaining pages
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(Server._fetch_and_filter_items, logger, item_endpoint, item_name, page, container)
for page in range(2, total_pages + 1)
]
for future in concurrent.futures.as_completed(futures):
result.extend(future.result())

return result

@staticmethod
def _fetch_and_filter_items(
logger, item_endpoint, item_name: str, page_number: int, container: Optional[TSC.ProjectItem]
):
all_items, _ = Server._fetch_items(logger, item_endpoint, item_name, page_number)
return Server._filter_items_by_container(logger, all_items, container)

@staticmethod
def _fetch_items(logger, item_endpoint, item_name: str, page_number: int):
req_option = TSC.RequestOptions(pagenumber=page_number)
req_option.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name, TSC.RequestOptions.Operator.Equals, item_name))
all_items, pagination_item = item_endpoint.get(req_option)
logger.debug(
"{} items of name: {} were found for query page number: {}, page size: {} & total available: {}".format(
len(all_items),
item_name,
pagination_item.page_number,
pagination_item.page_size,
pagination_item.total_available,
)
)

return all_items, pagination_item

@staticmethod
def _filter_items_by_container(logger, all_items, container: Optional[TSC.ProjectItem]) -> List:
if container:
container_id = container.id
logger.debug("Filtering to items in project {}".format(container.id))
return list(filter(lambda item: item.project_id == container_id, all_items))

return all_items

# Get site by name or get currently logged in site
@staticmethod
def get_site_for_command_or_throw(logger, server, site_name):
Expand Down

0 comments on commit e029e22

Please sign in to comment.