diff --git a/gef.py b/gef.py index 41cb61da6..4799c5e69 100644 --- a/gef.py +++ b/gef.py @@ -3530,7 +3530,7 @@ def is_target_remote_or_extended(conn: gdb.TargetConnection | None = None) -> bo return is_target_remote(conn) or is_target_extended_remote(conn) -def is_running_under_qemu() -> bool: +def is_running_in_qemu() -> bool: "See https://www.qemu.org/docs/master/system/gdb.html " if not is_target_remote(): return False @@ -3538,15 +3538,15 @@ def is_running_under_qemu() -> bool: return "ENABLE=" in response -def is_running_under_qemu_user() -> bool: - if not is_running_under_qemu(): +def is_running_in_qemu_user() -> bool: + if not is_running_in_qemu(): return False response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" # Use `qAttached`? return "Text=" in response -def is_running_under_qemu_system() -> bool: - if not is_running_under_qemu(): +def is_running_in_qemu_system() -> bool: + if not is_running_in_qemu(): return False # Use "maintenance packet qqemu.PhyMemMode"? response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" @@ -3554,9 +3554,7 @@ def is_running_under_qemu_system() -> bool: def is_running_in_gdbserver() -> bool: - if is_running_under_qemu(): - return False - return not is_running_under_qemu() + return not is_running_in_qemu() def is_running_in_rr() -> bool: @@ -7447,7 +7445,7 @@ def __init__(self) -> None: def do_invoke(self, _: list[str], **kwargs: Any) -> None: args : argparse.Namespace = kwargs["arguments"] - if is_running_under_qemu_system(): + if is_running_in_qemu_system(): err("Unsupported") return @@ -11345,7 +11343,7 @@ def __repr__(self) -> str: def auxiliary_vector(self) -> dict[str, int] | None: if not is_alive(): return None - if is_running_under_qemu_system(): + if is_running_in_qemu_system(): return None if not self._auxiliary_vector: auxiliary_vector = {} @@ -11493,9 +11491,9 @@ def prompt_string(self) -> str: @staticmethod def init() -> "GefRemoteSessionManager.RemoteMode": - if is_running_under_qemu_system(): + if is_running_in_qemu_system(): return GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM - if is_running_under_qemu_user(): + if is_running_in_qemu_user(): return GefRemoteSessionManager.RemoteMode.QEMU_USER if is_running_in_rr(): return GefRemoteSessionManager.RemoteMode.RR @@ -11619,7 +11617,7 @@ def connect(self, pid: int) -> bool: def setup(self) -> bool: # setup remote adequately depending on remote or qemu mode - dbg(f"Setting up the {self._mode} session") + info(f"Setting up remote session as '{self._mode}'") match self.mode: case GefRemoteSessionManager.RemoteMode.QEMU_USER: self.__setup_qemu_user() @@ -11826,21 +11824,24 @@ def reset_caches(self) -> None: return +def target_remote_hook(): + # disable the context until the session has been fully established + gef.config["context.enable"] = False + + def target_remote_posthook(): - print(f"{is_target_remote()=}") - print(f"{is_target_remote_or_extended()=}") - print(f"{is_target_extended_remote()=}") - print(f"{is_running_under_qemu()=}") - print(f"{is_running_under_qemu_system()=}") - print(f"{is_running_under_qemu_user()=}") - print(f"{is_running_in_gdbserver()=}") - print(f"{is_running_in_rr()=}") conn = gdb.selected_inferior().connection if not isinstance(conn, gdb.RemoteTargetConnection): raise TypeError("Expected type gdb.RemoteTargetConnection") assert is_target_remote_or_extended(conn), "Target is not remote" gef.session.remote = GefRemoteSessionManager(conn) + # re-enable context + gef.config["context.enable"] = True + + # if here, no exception was thrown, print context + gdb.execute("context") + if __name__ == "__main__": if sys.version_info[0] == 2: @@ -11903,14 +11904,18 @@ def target_remote_posthook(): GefTmuxSetup() - # Initialize `target *remote` post hooks + # Initialize `target *remote` pre/post hooks hook = """ - define target hookpost-{0} - pi target_remote_posthook() + define target hook{1}-{0} + pi target_remote_{1}hook() end """ - gdb.execute(hook.format("remote")) - gdb.execute(hook.format("extended-remote")) + # pre-hooks + gdb.execute(hook.format("remote", "")) + gdb.execute(hook.format("extended-remote", "")) + # post-hooks + gdb.execute(hook.format("remote", "post")) + gdb.execute(hook.format("extended-remote", "post")) # restore saved breakpoints (if any) bkp_fpath = pathlib.Path(gef.config["gef.autosave_breakpoints_file"]).expanduser().absolute() diff --git a/tests/api/gef_remote.py b/tests/api/gef_remote.py new file mode 100644 index 000000000..fceb96546 --- /dev/null +++ b/tests/api/gef_remote.py @@ -0,0 +1,87 @@ +""" +`target remote/extended-remote` test module. +""" + + +from tests.base import RemoteGefUnitTestGeneric +from tests.utils import debug_target, gdbserver_session, gdbserver_multi_session, get_random_port, qemuuser_session + + +class GefRemoteApi(RemoteGefUnitTestGeneric): + + def setUp(self) -> None: + self._target = debug_target("default") + return super().setUp() + + def test_gef_remote_test_gdbserver(self): + """Test `gdbserver file`""" + _gdb = self._gdb + _root = self._conn.root + port = get_random_port() + + with gdbserver_session(port=port): + assert not _root.eval("is_target_remote()") + assert not _root.eval("is_target_remote_or_extended()") + assert not _root.eval("is_running_in_gdbserver()") + + _gdb.execute(f"target remote :{port}") + + assert _root.eval("is_target_remote()") + assert _root.eval("is_target_remote_or_extended()") + assert _root.eval("is_running_in_gdbserver()") + + assert not _root.eval("is_target_extended_remote()") + assert not _root.eval("is_running_under_qemu()") + assert not _root.eval("is_running_under_qemu_system()") + assert not _root.eval("is_running_under_qemu_user()") + assert not _root.eval("is_running_in_rr()") + + def test_gef_remote_test_gdbserver_multi(self): + """Test `gdbserver --multi file`""" + _gdb = self._gdb + _root = self._conn.root + port = get_random_port() + + with gdbserver_multi_session(port=port): + assert not _root.eval("is_target_remote()") + assert not _root.eval("is_target_remote_or_extended()") + assert not _root.eval("is_running_in_gdbserver()") + + _gdb.execute(f"target extended-remote :{port}") + + assert _root.eval("is_target_remote()") + assert _root.eval("is_target_remote_or_extended()") + assert _root.eval("is_target_extended_remote()") + assert _root.eval("is_running_in_gdbserver()") + + assert not _root.eval("is_running_under_qemu()") + assert not _root.eval("is_running_under_qemu_system()") + assert not _root.eval("is_running_under_qemu_user()") + assert not _root.eval("is_running_in_rr()") + + def test_gef_remote_test_qemuuser(self): + """Test `qemu-user -g`""" + _gdb = self._gdb + _root = self._conn.root + port = get_random_port() + + with qemuuser_session(port=port): + assert not _root.eval("is_target_remote()") + assert not _root.eval("is_target_remote_or_extended()") + assert not _root.eval("is_running_in_gdbserver()") + + _gdb.execute(f"target remote :{port}") + + assert _root.eval("is_target_remote()") + assert _root.eval("is_target_remote_or_extended()") + assert _root.eval("is_running_under_qemu()") + assert _root.eval("is_running_under_qemu_user()") + + assert not _root.eval("is_target_extended_remote()") + assert not _root.eval("is_running_under_qemu_system()") + assert not _root.eval("is_running_in_gdbserver()") + assert not _root.eval("is_running_in_rr()") + + # TODO add tests for + # - [ ] qemu-system + # - [ ] rr diff --git a/tests/base.py b/tests/base.py index 868923d3a..e1584eaf7 100644 --- a/tests/base.py +++ b/tests/base.py @@ -10,7 +10,7 @@ import rpyc -from .utils import debug_target +from .utils import debug_target, get_random_port COVERAGE_DIR = os.getenv("COVERAGE_DIR", "") GEF_PATH = pathlib.Path(os.getenv("GEF_PATH", "gef.py")).absolute() @@ -58,7 +58,7 @@ def __setup(self): # # Select a random tcp port for rpyc # - self._port = random.randint(1025, 65535) + self._port = get_random_port() self._commands = "" if COVERAGE_DIR: diff --git a/tests/utils.py b/tests/utils.py index f88d3b304..8b9247758 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -12,6 +12,7 @@ import subprocess import tempfile import time +import random from typing import Iterable, List, Optional, Union from urllib.request import urlopen @@ -112,6 +113,13 @@ def start_gdbserver( logging.debug(f"Starting {cmd}") return subprocess.Popen(cmd) +def start_gdbserver_multi( + host: str = GDBSERVER_DEFAULT_HOST, + port: int = GDBSERVER_DEFAULT_PORT, +) -> subprocess.Popen: + cmd = [GDBSERVER_BINARY, "--multi", f"{host}:{port}"] + logging.debug(f"Starting {cmd}") + return subprocess.Popen(cmd) def stop_gdbserver(gdbserver: subprocess.Popen) -> None: """Stop the gdbserver and wait until it is terminated if it was @@ -138,6 +146,17 @@ def gdbserver_session( finally: stop_gdbserver(sess) +@contextlib.contextmanager +def gdbserver_multi_session( + port: int = GDBSERVER_DEFAULT_PORT, + host: str = GDBSERVER_DEFAULT_HOST, +): + sess = start_gdbserver_multi(host, port) + try: + time.sleep(1) # forced delay to allow gdbserver to start listening + yield sess + finally: + stop_gdbserver(sess) def start_qemuuser( exe: Union[str, pathlib.Path] = debug_target("default"), @@ -301,3 +320,14 @@ def p32(x: int) -> bytes: def p64(x: int) -> bytes: return struct.pack(" int: + global __available_ports + if len(__available_ports) < 2: + __available_ports = list( range(1024, 65535) ) + idx = random.choice(range(len(__available_ports))) + port = __available_ports[idx] + __available_ports.pop(idx) + return port