From 26e2e7b5cc4bec2a38052a391dbdcb828a623f5f Mon Sep 17 00:00:00 2001 From: Alex Kiura Date: Sun, 1 Sep 2024 22:32:03 +0300 Subject: [PATCH 1/2] add example demonstrating batching many requests without hitting rate limits --- examples/batch_rate_limit.py | 55 ++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100755 examples/batch_rate_limit.py diff --git a/examples/batch_rate_limit.py b/examples/batch_rate_limit.py new file mode 100755 index 00000000..bea8fa29 --- /dev/null +++ b/examples/batch_rate_limit.py @@ -0,0 +1,55 @@ +#!/usr/bin/env -S rye run python + +import pprint +import asyncio +import itertools + +from anthropic import AsyncAnthropic + + +async def make_batch(sequence, batch_size): + iterator = iter(sequence) + while batch := tuple(itertools.islice(iterator, batch_size)): + yield batch + + +async def process_prompt(client, prompt): + try: + response = await client.messages.create( + model="claude-3-opus-20240229", + max_tokens=512, + messages=[{"role": "user", "content": prompt}], + ) + return {"input": prompt, "repsonse": response.content[0].text} + except Exception as e: + print(f"Error processing prompt {prompt}: {e}") + return None + + +async def process_batch(client, batch): + tasks = [process_prompt(client, prompt) for prompt in batch] + return await asyncio.gather(*tasks) + + +async def main(prompts, batch_size=5, delay_between_batches=60): + client = AsyncAnthropic() + + results = [] + async for batch in make_batch(prompts, batch_size): + batch_results = await process_batch(client, batch) + results.extend(batch_results) + await asyncio.sleep(delay_between_batches) + + return results + + +prompts = [ + "What is the capital of Mali?", + "What is the difference between SQL and NoSQL?.", + "What is the difference between the javascript's bind and Python's functools.partial?", + "Explain rate limiting in no more than two sentences.", + "Which country won the most gold medals in the 2024 Olympics'?", + "What is a higher-order-function?", +] + +pprint.pprint(asyncio.run(main(prompts))) From 5cfc8d006a49e1db9671d5a92f338e6c83f678f7 Mon Sep 17 00:00:00 2001 From: Alex Kiura Date: Tue, 3 Sep 2024 14:08:45 +0300 Subject: [PATCH 2/2] add type hints --- examples/batch_rate_limit.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/examples/batch_rate_limit.py b/examples/batch_rate_limit.py index bea8fa29..5b779557 100755 --- a/examples/batch_rate_limit.py +++ b/examples/batch_rate_limit.py @@ -3,41 +3,51 @@ import pprint import asyncio import itertools +from typing import Dict, List, Tuple, Union +from collections.abc import AsyncGenerator from anthropic import AsyncAnthropic +from anthropic.types import TextBlock -async def make_batch(sequence, batch_size): +async def make_batch(sequence: List[str], batch_size: int) -> AsyncGenerator[Tuple[str, ...], None]: iterator = iter(sequence) - while batch := tuple(itertools.islice(iterator, batch_size)): + while True: + batch = tuple(itertools.islice(iterator, batch_size)) + if not batch: + break yield batch -async def process_prompt(client, prompt): +async def process_prompt(client: AsyncAnthropic, prompt: str) -> Union[Dict[str, str], None]: try: response = await client.messages.create( - model="claude-3-opus-20240229", + model="claude-3-haiku-20240307", max_tokens=512, messages=[{"role": "user", "content": prompt}], ) - return {"input": prompt, "repsonse": response.content[0].text} + if response.content: + content = response.content[0] + if isinstance(content, TextBlock): + return {"input": prompt, "response": content.text} + return None except Exception as e: print(f"Error processing prompt {prompt}: {e}") return None -async def process_batch(client, batch): +async def process_batch(client: AsyncAnthropic, batch: Tuple[str, ...]) -> List[Union[Dict[str, str], None]]: tasks = [process_prompt(client, prompt) for prompt in batch] return await asyncio.gather(*tasks) -async def main(prompts, batch_size=5, delay_between_batches=60): +async def main(prompts: List[str], batch_size: int = 5, delay_between_batches: int = 60) -> List[Dict[str, str]]: client = AsyncAnthropic() - results = [] + results: List[Dict[str, str]] = [] async for batch in make_batch(prompts, batch_size): batch_results = await process_batch(client, batch) - results.extend(batch_results) + results.extend(filter(None, batch_results)) await asyncio.sleep(delay_between_batches) return results