diff --git a/openhands/llm/llm.py b/openhands/llm/llm.py index 3ecf19c3672e..f6b2954565f9 100644 --- a/openhands/llm/llm.py +++ b/openhands/llm/llm.py @@ -8,6 +8,7 @@ import requests from openhands.core.config import LLMConfig +from openhands.utils.ensure_httpx_close import ensure_httpx_close with warnings.catch_warnings(): warnings.simplefilter('ignore') @@ -232,9 +233,9 @@ def wrapper(*args, **kwargs): # Record start time for latency measurement start_time = time.time() - - # we don't support streaming here, thus we get a ModelResponse - resp: ModelResponse = self._completion_unwrapped(*args, **kwargs) + with ensure_httpx_close(): + # we don't support streaming here, thus we get a ModelResponse + resp: ModelResponse = self._completion_unwrapped(*args, **kwargs) # Calculate and record latency latency = time.time() - start_time @@ -289,7 +290,11 @@ def wrapper(*args, **kwargs): 'messages': messages, 'response': resp, 'args': args, - 'kwargs': {k: v for k, v in kwargs.items() if k != 'messages'}, + 'kwargs': { + k: v + for k, v in kwargs.items() + if k not in ('messages', 'client') + }, 'timestamp': time.time(), 'cost': cost, } diff --git a/openhands/utils/ensure_httpx_close.py b/openhands/utils/ensure_httpx_close.py new file mode 100644 index 000000000000..e7177f47b104 --- /dev/null +++ b/openhands/utils/ensure_httpx_close.py @@ -0,0 +1,78 @@ +""" +LiteLLM currently have an issue where HttpHandlers are being created but not +closed. We have submitted a PR to them, (https://github.com/BerriAI/litellm/pull/8711) +and their dev team say they are in the process of a refactor that will fix this, but +in the meantime, we need to manage the lifecycle of the httpx.Client manually. + +We can't simply pass in our own client object, because all the different implementations use +different types of client object. + +So we monkey patch the httpx.Client class to track newly created instances and close these +when the operations complete. (Since some paths create a single shared client and reuse these, +we actually need to create a proxy object that allows these clients to be reusable.) + +Hopefully, this will be fixed soon and we can remove this abomination. +""" + +import contextlib +from typing import Callable + +import httpx + + +@contextlib.contextmanager +def ensure_httpx_close(): + wrapped_class = httpx.Client + proxys = [] + + class ClientProxy: + """ + Sometimes LiteLLM opens a new httpx client for each connection, and does not close them. + Sometimes it does close them. Sometimes, it reuses a client between connections. For cases + where a client is reused, we need to be able to reuse the client even after closing it. + """ + + client_constructor: Callable + args: tuple + kwargs: dict + client: httpx.Client + + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + self.client = wrapped_class(*self.args, **self.kwargs) + proxys.append(self) + + def __getattr__(self, name): + # Invoke a method on the proxied client - create one if required + if self.client is None: + self.client = wrapped_class(*self.args, **self.kwargs) + return getattr(self.client, name) + + def close(self): + # Close the client if it is open + if self.client: + self.client.close() + self.client = None + + def __iter__(self, *args, **kwargs): + # We have to override this as debuggers invoke it causing the client to reopen + if self.client: + return self.client.iter(*args, **kwargs) + return object.__getattribute__(self, 'iter')(*args, **kwargs) + + @property + def is_closed(self): + # Check if closed + if self.client is None: + return True + return self.client.is_closed + + httpx.Client = ClientProxy + try: + yield + finally: + httpx.Client = wrapped_class + while proxys: + proxy = proxys.pop() + proxy.close() diff --git a/tests/unit/test_ensure_httpx_close.py b/tests/unit/test_ensure_httpx_close.py new file mode 100644 index 000000000000..eab8ff13e43f --- /dev/null +++ b/tests/unit/test_ensure_httpx_close.py @@ -0,0 +1,69 @@ +import httpx + +from openhands.utils.ensure_httpx_close import ensure_httpx_close + + +def test_ensure_httpx_close_basic(): + """Test basic functionality of ensure_httpx_close.""" + ctx = ensure_httpx_close() + with ctx: + # Create a client - should be tracked + client = httpx.Client() + + # After context exit, client should be closed + assert client.is_closed + + +def test_ensure_httpx_close_multiple_clients(): + """Test ensure_httpx_close with multiple clients.""" + ctx = ensure_httpx_close() + with ctx: + client1 = httpx.Client() + client2 = httpx.Client() + + assert client1.is_closed + assert client2.is_closed + + +def test_ensure_httpx_close_nested(): + """Test nested usage of ensure_httpx_close.""" + with ensure_httpx_close(): + client1 = httpx.Client() + + with ensure_httpx_close(): + client2 = httpx.Client() + assert not client2.is_closed + + # After inner context, client2 should be closed + assert client2.is_closed + # client1 should still be open since outer context is still active + assert not client1.is_closed + + # After outer context, both clients should be closed + assert client1.is_closed + assert client2.is_closed + + +def test_ensure_httpx_close_exception(): + """Test ensure_httpx_close when an exception occurs.""" + client = None + try: + with ensure_httpx_close(): + client = httpx.Client() + raise ValueError('Test exception') + except ValueError: + pass + + # Client should be closed even if an exception occurred + assert client is not None + assert client.is_closed + + +def test_ensure_httpx_close_restore_client(): + """Test that the original client is restored after context exit.""" + original_client = httpx.Client + with ensure_httpx_close(): + assert httpx.Client != original_client + + # Original __init__ should be restored + assert httpx.Client == original_client diff --git a/tests/unit/test_llm.py b/tests/unit/test_llm.py index 0ec7fe252192..57906c2c776c 100644 --- a/tests/unit/test_llm.py +++ b/tests/unit/test_llm.py @@ -1,4 +1,6 @@ import copy +import tempfile +from pathlib import Path from unittest.mock import MagicMock, patch import pytest @@ -489,3 +491,27 @@ def test_llm_token_usage(mock_litellm_completion, default_config): assert usage_entry_2['cache_read_tokens'] == 1 assert usage_entry_2['cache_write_tokens'] == 3 assert usage_entry_2['response_id'] == 'test-response-usage-2' + + +@patch('openhands.llm.llm.litellm_completion') +def test_completion_with_log_completions(mock_litellm_completion, default_config): + with tempfile.TemporaryDirectory() as temp_dir: + default_config.log_completions = True + default_config.log_completions_folder = temp_dir + mock_response = { + 'choices': [{'message': {'content': 'This is a mocked response.'}}] + } + mock_litellm_completion.return_value = mock_response + + test_llm = LLM(config=default_config) + response = test_llm.completion( + messages=[{'role': 'user', 'content': 'Hello!'}], + stream=False, + drop_params=True, + ) + assert ( + response['choices'][0]['message']['content'] == 'This is a mocked response.' + ) + files = list(Path(temp_dir).iterdir()) + # Expect a log to be generated + assert len(files) == 1