-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
write integration tests for pty-server in a container
- Loading branch information
Showing
4 changed files
with
180 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
import json | ||
import time | ||
|
||
import pytest | ||
|
||
from breba_docs.container import container_setup | ||
|
||
from pty_server import AsyncPtyClient | ||
from breba_docs.socket_server.listener import PORT | ||
from pty_server.async_client import STATUS_TIMEOUT, STATUS_COMPLETED, PtyServerResponse | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def container(): | ||
# To Run the container from terminal from breba_docs package dir | ||
# docker run -d -it \ | ||
# -v $(pwd)/breba_docs/socket_server:/usr/src/socket_server \ | ||
# -w /usr/src \ | ||
# -p 44440:44440 \ | ||
# python:3 \ | ||
# /bin/bash | ||
started_container = container_setup(dev=True) | ||
time.sleep(2) | ||
yield started_container | ||
started_container.stop() | ||
started_container.remove() | ||
|
||
async def response_accumulator(response: PtyServerResponse, timeout): | ||
data = "" | ||
async for chunk in response.stream(timeout): | ||
data += chunk | ||
return data | ||
|
||
@pytest.mark.integration | ||
@pytest.mark.asyncio | ||
async def test_execute_command(container): | ||
async with AsyncPtyClient() as client: | ||
response_install = await client.send_command('pip install pexpect') | ||
reponse_text = await response_accumulator(response_install, 2) | ||
assert response_install.status == STATUS_COMPLETED | ||
assert "Successfully installed pexpect" in reponse_text | ||
|
||
response_uninstall = await client.send_command('pip uninstall pexpect') | ||
reponse_text = await response_accumulator(response_uninstall, 2) | ||
assert response_uninstall.status == STATUS_TIMEOUT | ||
assert "Proceed (Y/n)" in reponse_text | ||
|
||
command = {"input": 'Y'} | ||
await client.send_message(json.dumps(command)) | ||
response_text = await response_accumulator(response_uninstall, 2) | ||
assert response_uninstall.status == STATUS_COMPLETED | ||
assert "Successfully uninstalled" in response_text | ||
|
||
|
||
@pytest.mark.integration | ||
@pytest.mark.asyncio | ||
async def test_execute_ampersand_command(container): | ||
async with AsyncPtyClient() as client: | ||
command = 'mkdir test && cd test && pwd && echo "more testing is needed"' | ||
response = await client.send_command(command) | ||
response_text = await response_accumulator(response, 0.01) | ||
assert "/usr/src/test" in response_text | ||
assert "No such file or directory" not in response_text | ||
|
||
|
||
@pytest.mark.integration | ||
@pytest.mark.asyncio | ||
async def test_multiple_connections(container): | ||
async with AsyncPtyClient() as client: | ||
command = 'mkdir test2 && cd test2 && pwd && echo "more testing is needed"' | ||
response = await client.send_command(command) | ||
response_text = await response_accumulator(response, 0.01) | ||
|
||
assert "/usr/src/test" in response_text | ||
assert "No such file or directory" not in response_text | ||
|
||
async with AsyncPtyClient() as client: | ||
command = 'mkdir test3 && cd test3 && pwd && echo "more testing is needed"' | ||
response = await client.send_command(command) | ||
response_text = await response_accumulator(response, 0.01) | ||
assert "/usr/src/test3" in response_text | ||
assert "No such file or directory" not in response_text |