Skip to content

Commit

Permalink
Fix fd leak (#6950)
Browse files Browse the repository at this point in the history
  • Loading branch information
tofarr authored Feb 26, 2025
1 parent db1f5a8 commit b38039e
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 4 deletions.
13 changes: 9 additions & 4 deletions openhands/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import requests

from openhands.core.config import LLMConfig
from openhands.utils.ensure_httpx_close import ensure_httpx_close

with warnings.catch_warnings():
warnings.simplefilter('ignore')
Expand Down Expand Up @@ -232,9 +233,9 @@ def wrapper(*args, **kwargs):

# Record start time for latency measurement
start_time = time.time()

# we don't support streaming here, thus we get a ModelResponse
resp: ModelResponse = self._completion_unwrapped(*args, **kwargs)
with ensure_httpx_close():
# we don't support streaming here, thus we get a ModelResponse
resp: ModelResponse = self._completion_unwrapped(*args, **kwargs)

# Calculate and record latency
latency = time.time() - start_time
Expand Down Expand Up @@ -289,7 +290,11 @@ def wrapper(*args, **kwargs):
'messages': messages,
'response': resp,
'args': args,
'kwargs': {k: v for k, v in kwargs.items() if k != 'messages'},
'kwargs': {
k: v
for k, v in kwargs.items()
if k not in ('messages', 'client')
},
'timestamp': time.time(),
'cost': cost,
}
Expand Down
78 changes: 78 additions & 0 deletions openhands/utils/ensure_httpx_close.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
LiteLLM currently have an issue where HttpHandlers are being created but not
closed. We have submitted a PR to them, (https://github.com/BerriAI/litellm/pull/8711)
and their dev team say they are in the process of a refactor that will fix this, but
in the meantime, we need to manage the lifecycle of the httpx.Client manually.
We can't simply pass in our own client object, because all the different implementations use
different types of client object.
So we monkey patch the httpx.Client class to track newly created instances and close these
when the operations complete. (Since some paths create a single shared client and reuse these,
we actually need to create a proxy object that allows these clients to be reusable.)
Hopefully, this will be fixed soon and we can remove this abomination.
"""

import contextlib
from typing import Callable

import httpx


@contextlib.contextmanager
def ensure_httpx_close():
wrapped_class = httpx.Client
proxys = []

class ClientProxy:
"""
Sometimes LiteLLM opens a new httpx client for each connection, and does not close them.
Sometimes it does close them. Sometimes, it reuses a client between connections. For cases
where a client is reused, we need to be able to reuse the client even after closing it.
"""

client_constructor: Callable
args: tuple
kwargs: dict
client: httpx.Client

def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.client = wrapped_class(*self.args, **self.kwargs)
proxys.append(self)

def __getattr__(self, name):
# Invoke a method on the proxied client - create one if required
if self.client is None:
self.client = wrapped_class(*self.args, **self.kwargs)
return getattr(self.client, name)

def close(self):
# Close the client if it is open
if self.client:
self.client.close()
self.client = None

def __iter__(self, *args, **kwargs):
# We have to override this as debuggers invoke it causing the client to reopen
if self.client:
return self.client.iter(*args, **kwargs)
return object.__getattribute__(self, 'iter')(*args, **kwargs)

@property
def is_closed(self):
# Check if closed
if self.client is None:
return True
return self.client.is_closed

httpx.Client = ClientProxy
try:
yield
finally:
httpx.Client = wrapped_class
while proxys:
proxy = proxys.pop()
proxy.close()
69 changes: 69 additions & 0 deletions tests/unit/test_ensure_httpx_close.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import httpx

from openhands.utils.ensure_httpx_close import ensure_httpx_close


def test_ensure_httpx_close_basic():
"""Test basic functionality of ensure_httpx_close."""
ctx = ensure_httpx_close()
with ctx:
# Create a client - should be tracked
client = httpx.Client()

# After context exit, client should be closed
assert client.is_closed


def test_ensure_httpx_close_multiple_clients():
"""Test ensure_httpx_close with multiple clients."""
ctx = ensure_httpx_close()
with ctx:
client1 = httpx.Client()
client2 = httpx.Client()

assert client1.is_closed
assert client2.is_closed


def test_ensure_httpx_close_nested():
"""Test nested usage of ensure_httpx_close."""
with ensure_httpx_close():
client1 = httpx.Client()

with ensure_httpx_close():
client2 = httpx.Client()
assert not client2.is_closed

# After inner context, client2 should be closed
assert client2.is_closed
# client1 should still be open since outer context is still active
assert not client1.is_closed

# After outer context, both clients should be closed
assert client1.is_closed
assert client2.is_closed


def test_ensure_httpx_close_exception():
"""Test ensure_httpx_close when an exception occurs."""
client = None
try:
with ensure_httpx_close():
client = httpx.Client()
raise ValueError('Test exception')
except ValueError:
pass

# Client should be closed even if an exception occurred
assert client is not None
assert client.is_closed


def test_ensure_httpx_close_restore_client():
"""Test that the original client is restored after context exit."""
original_client = httpx.Client
with ensure_httpx_close():
assert httpx.Client != original_client

# Original __init__ should be restored
assert httpx.Client == original_client
26 changes: 26 additions & 0 deletions tests/unit/test_llm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import copy
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -489,3 +491,27 @@ def test_llm_token_usage(mock_litellm_completion, default_config):
assert usage_entry_2['cache_read_tokens'] == 1
assert usage_entry_2['cache_write_tokens'] == 3
assert usage_entry_2['response_id'] == 'test-response-usage-2'


@patch('openhands.llm.llm.litellm_completion')
def test_completion_with_log_completions(mock_litellm_completion, default_config):
with tempfile.TemporaryDirectory() as temp_dir:
default_config.log_completions = True
default_config.log_completions_folder = temp_dir
mock_response = {
'choices': [{'message': {'content': 'This is a mocked response.'}}]
}
mock_litellm_completion.return_value = mock_response

test_llm = LLM(config=default_config)
response = test_llm.completion(
messages=[{'role': 'user', 'content': 'Hello!'}],
stream=False,
drop_params=True,
)
assert (
response['choices'][0]['message']['content'] == 'This is a mocked response.'
)
files = list(Path(temp_dir).iterdir())
# Expect a log to be generated
assert len(files) == 1

0 comments on commit b38039e

Please sign in to comment.