Skip to content

Commit

Permalink
update to using the ptyprocess based interactive-process
Browse files Browse the repository at this point in the history
  • Loading branch information
yasonk committed Jan 18, 2025
1 parent da3fc53 commit d7c2d23
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 36 deletions.
12 changes: 3 additions & 9 deletions breba_docs/socket_server/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,16 @@
async def stream_output(process: InteractiveProcess, writer: StreamWriter, end_marker: str):
while True:
try:
output, err = process.read_nonblocking(timeout=1)
# TODO: handle unexpected exceptions gracefully. Currently client doesn't receive anything in case of
# unexpected error and may be stuck waiting for response indefinitely. The client could implement a timeout,
# it would save some time to know that things went badly
output = process.read_nonblocking(timeout=1)
if output:
logger.info("stdout: " + output.strip())
logger.info("Process output: " + output.strip())
writer.write(output.encode())
await writer.drain()
if err:
logger.info("stderr: " + err.strip())
writer.write(err.encode())
await writer.drain()
if end_marker in output:
logger.info("Breaking on end marker")
break
except TimeoutError:
# TODO: this will be an infinite loop if end_marker is not found due to an error in the command, should use max_timeout, or use an http client
await asyncio.sleep(0.1) # TODO: maybe add max sleep time.
except (TerminatedProcessError, ReadWriteError) as e:
logger.info("End of process output.")
Expand Down
11 changes: 7 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ langchain-core = "^0.3.21"
langchain-openai = "^0.2.10"
langgraph = "^0.2.53"
pytest-asyncio = "^0.25.0"
interactive-process = "^0.1.0"
interactive-process = "^0.2.0"


[tool.poetry.group.test.dependencies]
Expand Down
51 changes: 29 additions & 22 deletions tests/integration/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@
from breba_docs.socket_server.listener import start_server, stop_server


@pytest_asyncio.fixture
async def real_server_connection(mocker) -> (MagicMock, StreamReader, StreamWriter):
reader, writer = await asyncio.open_connection("127.0.0.1", 44440)
try:
yield reader, writer
finally:
writer.close()
await writer.wait_closed()


@pytest_asyncio.fixture
async def server():
async_server = asyncio.create_task(start_server())
Expand All @@ -32,9 +22,13 @@ async def server():
async_server.cancel()


@pytest.fixture
def client():
@pytest_asyncio.fixture
async def client():
with Client() as client:
await asyncio.sleep(0.1) # server is running in parallel, it will not produce new output without async sleep
response_fn = client.send_command('clear')
await asyncio.sleep(0.1) # server is running in parallel, it will not produce new output without async sleep
flushed = response_fn(0.1)
yield client

@pytest_asyncio.fixture
Expand All @@ -47,14 +41,16 @@ async def aclient():
async def test_echo_command(server, client):
payload = json.dumps({"command": "echo Hello", "command_id": "test"})
client.send_message(payload)
await asyncio.sleep(0.1) # server is running in parallel so we need to wait for it
await asyncio.sleep(0.5) # server is running in parallel so we need to wait for it

data = ""
for chunk in client.stream_response(timeout=0.1):
data += chunk
await asyncio.sleep(0.1) # server is running in parallel, it will not produce new output without async sleep

assert data == '$ echo Hello\nHello\nCompleted test\n'
assert "$ echo Hello\r\n" in data
assert "Hello\r\n" in data
assert "Completed test\r\n" in data

@pytest.mark.asyncio
@pytest.mark.integration
Expand All @@ -66,7 +62,9 @@ async def test_async_echo_command(server, aclient):
async for chunk in aclient.stream_response(timeout=0.1):
data += chunk

assert data == '$ echo Hello\nHello\nCompleted test\n'
assert "$ echo Hello\r\n" in data
assert "Hello\r\n" in data
assert "Completed test\r\n" in data

@pytest.mark.asyncio
@pytest.mark.integration
Expand All @@ -85,7 +83,11 @@ async def test_echo_variable(server, client):
await asyncio.sleep(0.1)

# We collected all the output from the two commands
assert data == '$ export MY=Hello\nCompleted test1\n$ echo $MY\nHello\nCompleted test2\n'
assert "$ export MY=Hello\r\n" in data
assert "Completed test1\r\n" in data
assert "$ echo $MY\r\n" in data
assert "Hello\r\n" in data
assert "Completed test2\r\n" in data

@pytest.mark.asyncio
@pytest.mark.integration
Expand All @@ -101,21 +103,27 @@ async def test_async_echo_variable(server, aclient):
data += chunk

# We collected all the output from the two commands
assert data == '$ export MY=Hello\nCompleted test1\n$ echo $MY\nHello\nCompleted test2\n'
assert "$ export MY=Hello\r\n" in data
assert "Completed test1\r\n" in data
assert "$ echo $MY\r\n" in data
assert "Hello\r\n" in data
assert "Completed test2\r\n" in data


@pytest.mark.asyncio
@pytest.mark.integration
async def test_send_command_with_variable(server, client):
client.send_command("export MY=Hello")
response_accumulator =client.send_command("echo $MY")
response_accumulator = client.send_command("echo $MY")

await asyncio.sleep(0.1) # server is running in parallel so we need to wait for it

data = response_accumulator(0.1)
# We collected all the output from the two commands
expected_pattern = r"\$ export MY=Hello\nCompleted .*\n\$ echo \$MY\nHello\n\n" #TODO: the Completed shows up because we have two commands processed before reading response
assert re.match(expected_pattern, data)
assert "$ export MY=Hello\r\n" in data
assert "$ echo $MY\r\n" in data
assert "Hello\r\n" in data
assert re.search(r"Completed.*\r\n", data, flags=re.MULTILINE)

@pytest.mark.asyncio
@pytest.mark.integration
Expand All @@ -124,10 +132,9 @@ def timeout_handler(error):
assert isinstance(error, TimeoutError)
raise EndOfStream("timeout") # raise error otherwise we get infinite loop

expected_pattern = "$ read -p 'Press enter to continue'\n"
response_accumulator = client.send_command("read -p 'Press enter to continue'")
await asyncio.sleep(0.1) # server is running in parallel so we need to wait for it

data = response_accumulator(0.01, timeout_handler)
# We collected all the output from the two commands
assert data == expected_pattern
assert "$ read -p 'Press enter to continue'\r\n" in data

0 comments on commit d7c2d23

Please sign in to comment.