From 6f2b5ce6d8d8e51e3b85ded59ad7ff7ae5f235c3 Mon Sep 17 00:00:00 2001 From: Christopher Sande Date: Wed, 29 Jan 2025 11:36:51 -0600 Subject: [PATCH] Http helper method to execute HTTP requests in parallel --- canvas_sdk/utils/http.py | 60 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/canvas_sdk/utils/http.py b/canvas_sdk/utils/http.py index 7e374a7b..d4cb148d 100644 --- a/canvas_sdk/utils/http.py +++ b/canvas_sdk/utils/http.py @@ -1,7 +1,10 @@ +import concurrent +import functools import time -from collections.abc import Callable, Mapping +from collections.abc import Callable, Iterable, Mapping +from concurrent.futures import ThreadPoolExecutor from functools import wraps -from typing import Any, TypeVar, cast +from typing import Any, Literal, TypeVar, cast import requests import statsd @@ -9,6 +12,38 @@ F = TypeVar("F", bound=Callable) +class BatchedRequest: + """Representation of a request that will be executed in parallel with other requests.""" + + def __init__( + self, method: Literal["GET", "POST", "PUT", "PATCH"], url: str, *args: Any, **kwargs: Any + ) -> None: + self._method = method + self._url = url + self._args = args + self._kwargs = kwargs + + def fn(self, session: requests.Session) -> Callable: + """ + Return a callable constructed from the session object, method, URL, args, and kwargs. + + This callable is passed to the ThreadPoolExecutor. + """ + match self._method: + case "GET": + instance_method = session.get + case "POST": + instance_method = session.post + case "PUT": + instance_method = session.put + case "PATCH": + instance_method = session.patch + case _: + raise RuntimeError(f"HTTP method {self._method} is not supported") + + return functools.partial(instance_method, self._url, *self._args, **self._kwargs) + + class Http: """A helper class for completing HTTP calls with metrics tracking.""" @@ -72,3 +107,24 @@ def patch( ) -> requests.Response: """Sends a PATCH request.""" return self.session.patch(url, json=json, data=data, headers=headers) + + def batch_requests( + self, + batched_requests: Iterable[BatchedRequest], + max_workers: int | None = None, + timeout: int | None = None, + ) -> list[requests.Response]: + """ + Execute requests in parallel. + + Wait for the responses to complete, and then return a list of the responses. + """ + futures = [] + with ThreadPoolExecutor(max_workers=max_workers) as executor: + for request in batched_requests: + futures.append(executor.submit(request.fn(self.session))) + + # TODO: Is there a need to expose return_when or specify a different default value? https://docs.python.org/3.12/library/concurrent.futures.html#concurrent.futures.wait + concurrent.futures.wait(futures, timeout=timeout) + + return [future.result() for future in futures]