Skip to content

Commit

Permalink
Http helper method to execute HTTP requests in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
csande committed Jan 29, 2025
1 parent 518c62c commit 6f2b5ce
Showing 1 changed file with 58 additions and 2 deletions.
60 changes: 58 additions & 2 deletions canvas_sdk/utils/http.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,49 @@
import concurrent
import functools
import time
from collections.abc import Callable, Mapping
from collections.abc import Callable, Iterable, Mapping
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from typing import Any, TypeVar, cast
from typing import Any, Literal, TypeVar, cast

import requests
import statsd

F = TypeVar("F", bound=Callable)


class BatchedRequest:
"""Representation of a request that will be executed in parallel with other requests."""

def __init__(
self, method: Literal["GET", "POST", "PUT", "PATCH"], url: str, *args: Any, **kwargs: Any
) -> None:
self._method = method
self._url = url
self._args = args
self._kwargs = kwargs

def fn(self, session: requests.Session) -> Callable:
"""
Return a callable constructed from the session object, method, URL, args, and kwargs.
This callable is passed to the ThreadPoolExecutor.
"""
match self._method:
case "GET":
instance_method = session.get
case "POST":
instance_method = session.post
case "PUT":
instance_method = session.put
case "PATCH":
instance_method = session.patch
case _:
raise RuntimeError(f"HTTP method {self._method} is not supported")

return functools.partial(instance_method, self._url, *self._args, **self._kwargs)


class Http:
"""A helper class for completing HTTP calls with metrics tracking."""

Expand Down Expand Up @@ -72,3 +107,24 @@ def patch(
) -> requests.Response:
"""Sends a PATCH request."""
return self.session.patch(url, json=json, data=data, headers=headers)

def batch_requests(
self,
batched_requests: Iterable[BatchedRequest],
max_workers: int | None = None,
timeout: int | None = None,
) -> list[requests.Response]:
"""
Execute requests in parallel.
Wait for the responses to complete, and then return a list of the responses.
"""
futures = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for request in batched_requests:
futures.append(executor.submit(request.fn(self.session)))

# TODO: Is there a need to expose return_when or specify a different default value? https://docs.python.org/3.12/library/concurrent.futures.html#concurrent.futures.wait
concurrent.futures.wait(futures, timeout=timeout)

return [future.result() for future in futures]

0 comments on commit 6f2b5ce

Please sign in to comment.