From c9a8b18808f06bb2e8c7fdb6de5a3dafcbfa7c56 Mon Sep 17 00:00:00 2001 From: crazy hugsy Date: Sat, 20 Jan 2024 10:37:32 -0800 Subject: [PATCH] Use `info proc mapping` (#1046) ## Description Use `info proc mapping` as a first memory layout enumeration technique. Removed `maintenance info sections` which is not about memory layout Restore CI coverage (#1050) --- .github/workflows/coverage.yml | 74 ++++------------- .github/workflows/validate.yml | 8 +- gef.py | 109 +++++++++++++----------- tests/api/gef_memory.py | 147 +++++++++++++++++++++++++++++++++ tests/api/misc.py | 77 +---------------- tests/base.py | 9 +- tests/utils.py | 27 ++++-- 7 files changed, 259 insertions(+), 192 deletions(-) create mode 100644 tests/api/gef_memory.py diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 11a2210f0..a4bc2eb2f 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -1,6 +1,10 @@ name: CI Coverage for PR on: + pull_request_target: + types: + - opened + - synchronize pull_request: types: - opened @@ -32,71 +36,27 @@ jobs: current_score=$(curl --silent https://hugsy.github.io/gef/coverage/gef_py.html | grep pc_cov | sed 's?.*\([^%]*\)%?\1?g') bash scripts/generate-coverage-docs.sh new_score=$(cat docs/coverage/gef_py.html | grep pc_cov | sed 's?.*\([^%]*\)%?\1?g') - diff_score=$(python -c "print(${new_score} - ${current_score})") - commit=${{ github.event.pull_request.head.sha }} - include_tests=$((git diff ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }} --compact-summary | egrep --count '^ tests/') || echo 0) - include_docs=$((git diff ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }} --compact-summary | egrep --count '^ docs/') || echo 0) - echo "commit=${commit}" >> $GITHUB_OUTPUT - echo "new_coverage_score=${new_score}" >> $GITHUB_OUTPUT - echo "current_coverage_score=${current_score}" >> $GITHUB_OUTPUT - echo "diff_score=${diff_score}" >> $GITHUB_OUTPUT - echo "include_tests=${include_tests}" >> $GITHUB_OUTPUT - echo "include_docs=${include_docs}" >> $GITHUB_OUTPUT - WORDS=("pycharm" "debugpy" "ptvsd" "breakpoint" "pdb") - WORDS_FOUND="" - for word in "${WORDS[@]}"; do - if git diff ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }} -- "gef.py" | grep "^+" | grep -q "$word"; then - WORDS_FOUND+="'$word' " - fi - done - echo "words_found=${WORDS_FOUND}" >> $GITHUB_OUTPUT + score_diff=$(python -c "print(f'{${new_score} - ${current_score}:.04f}')") + echo "new_score=${new_score}" >> $GITHUB_OUTPUT + echo "current_score=${current_score}" >> $GITHUB_OUTPUT + echo "score_diff=${score_diff}" >> $GITHUB_OUTPUT - name: Post comment uses: actions/github-script@v7 - env: - COMMIT: ${{ steps.get_coverage.outputs.commit }} - SCORE_OLD: ${{ steps.get_coverage.outputs.current_coverage_score }} - SCORE_NEW: ${{ steps.get_coverage.outputs.new_coverage_score }} - SCORE_DIFF: ${{ steps.get_coverage.outputs.diff_score }} - WORDS_FOUND: ${{ steps.get_coverage.outputs.words_found }} with: script: | - const diff_score = ${{ steps.get_coverage.outputs.diff_score }}; - const tests_changes = ${{ steps.get_coverage.outputs.include_tests }}; - const docs_changes = ${{ steps.get_coverage.outputs.include_docs }}; - const forbiddenWordsString = process.env.WORDS_FOUND || ''; - const forbidden_words = forbiddenWordsString.split(" ").filter(word => word.trim() !== ''); - const comment = `## 🤖 Coverage update for ${process.env.COMMIT} - - * Diff Commit: ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }} - * Current vs New Coverage Score: ${process.env.SCORE_OLD}% / ${process.env.SCORE_NEW}% - * Difference: ${process.env.SCORE_DIFF} ${(diff_score >= 0) ? "🟢" : "🔴"} - - To this point, this PR: - * ${(tests_changes > 0) ? "includes" : "**does not** include" } changes to tests - * ${(docs_changes > 0) ? "includes" : "**does not** include" } changes to documentation - * ${(forbidden_words.length === 0 || forbidden_words[0] === '') ? "**does not** include forbidden words" : "includes the forbidden words:" + forbidden_words.join(", ")} + const old_score = ${{ steps.get_coverage.outputs.current_score }}; + const new_score = ${{ steps.get_coverage.outputs.new_score }}; + const score_diff = ${{ steps.get_coverage.outputs.score_diff }}; + const comment = `## 🤖 Coverage update for ${{ github.event.pull_request.head.sha }} ${(score_diff >= 0) ? "🟢" : "🔴"} + + | | Old | New | + |--------|-----|-----| + | Commit | ${{ github.event.pull_request.base.sha }} | ${{ github.event.pull_request.head.sha }} | + | Score | ${old_score}% | ${new_score}% (${score_diff}) | `; try { const { owner, repo, number } = context.issue; await github.rest.issues.createComment({ owner, repo, issue_number: number, body: comment }); - - if(docs_changes > 0) { - await github.rest.issues.addLabels({ - owner: owner, - repo: repo, - issue_number: number, - labels: ['documentation'] - }); - } - - if(tests_changes > 0) { - await github.rest.issues.addLabels({ - owner: owner, - repo: repo, - issue_number: number, - labels: ['automation/ci'] - }); - } } catch (err) { console.log(err); } diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index fac11a895..77ffc457f 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -11,9 +11,9 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - uses: actions/setup-python@v3 + - uses: actions/setup-python@v5.0.0 with: - python-version: "3.11" + python-version: "3.8" - uses: pre-commit/action@v3.0.0 docs_link_check: @@ -23,9 +23,9 @@ jobs: contents: read steps: - name: checkout - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Check links - uses: lycheeverse/lychee-action@v1.4.1 + uses: lycheeverse/lychee-action@v1.9.1 env: GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}} with: diff --git a/gef.py b/gef.py index 3300b1c85..ab76e4f41 100644 --- a/gef.py +++ b/gef.py @@ -682,7 +682,7 @@ def is_executable(self) -> bool: @property def size(self) -> int: if self.page_end is None or self.page_start is None: - return -1 + raise AttributeError return self.page_end - self.page_start @property @@ -691,16 +691,17 @@ def realpath(self) -> str: return self.path if gef.session.remote is None else f"/tmp/gef/{gef.session.remote:d}/{self.path}" def __str__(self) -> str: - return (f"Section(page_start={self.page_start:#x}, page_end={self.page_end:#x}, " - f"permissions={self.permission!s})") + return (f"Section(start={self.page_start:#x}, end={self.page_end:#x}, " + f"perm={self.permission!s})") + + def __repr__(self) -> str: + return str(self) def __eq__(self, other: "Section") -> bool: return other and \ self.page_start == other.page_start and \ - self.page_end == other.page_end and \ - self.offset == other.offset and \ + self.size == other.size and \ self.permission == other.permission and \ - self.inode == other.inode and \ self.path == other.path @@ -6041,15 +6042,16 @@ def do_invoke(self, _: List[str], **kwargs: Any) -> None: # calls `is_remote_debug` which checks if `remote_initializing` is True or `.remote` is None # This prevents some spurious errors being thrown during startup gef.session.remote_initializing = True - gef.session.remote = GefRemoteSessionManager(args.host, args.port, args.pid, qemu_binary) + session = GefRemoteSessionManager(args.host, args.port, args.pid, qemu_binary) - dbg(f"[remote] initializing remote session with {gef.session.remote.target} under {gef.session.remote.root}") - if not gef.session.remote.connect(args.pid): - raise EnvironmentError(f"Cannot connect to remote target {gef.session.remote.target}") - if not gef.session.remote.setup(): - raise EnvironmentError(f"Failed to create a proper environment for {gef.session.remote.target}") + dbg(f"[remote] initializing remote session with {session.target} under {session.root}") + if not session.connect(args.pid) or not session.setup(): + gef.session.remote = None + gef.session.remote_initializing = False + raise EnvironmentError("Failed to setup remote target") gef.session.remote_initializing = False + gef.session.remote = session reset_all_caches() gdb.execute("context") return @@ -6060,7 +6062,7 @@ class SkipiCommand(GenericCommand): """Skip N instruction(s) execution""" _cmdline_ = "skipi" - _syntax_ = ("{_cmdline_} [LOCATION] [--n NUM_INSTRUCTIONS]" + _syntax_ = (f"{_cmdline_} [LOCATION] [--n NUM_INSTRUCTIONS]" "\n\tLOCATION\taddress/symbol from where to skip" "\t--n NUM_INSTRUCTIONS\tSkip the specified number of instructions instead of the default 1.") @@ -6095,7 +6097,7 @@ class NopCommand(GenericCommand): aware.""" _cmdline_ = "nop" - _syntax_ = ("{_cmdline_} [LOCATION] [--i ITEMS] [--f] [--n] [--b]" + _syntax_ = (f"{_cmdline_} [LOCATION] [--i ITEMS] [--f] [--n] [--b]" "\n\tLOCATION\taddress/symbol to patch (by default this command replaces whole instructions)" "\t--i ITEMS\tnumber of items to insert (default 1)" "\t--f\tForce patch even when the selected settings could overwrite partial instructions" @@ -10460,10 +10462,12 @@ def read_ascii_string(self, address: int) -> Optional[str]: def maps(self) -> List[Section]: if not self.__maps: self.__maps = self._parse_maps() + if not self.__maps: + raise RuntimeError("Failed to get memory layout") return self.__maps @classmethod - def _parse_maps(cls) -> List[Section]: + def _parse_maps(cls) -> Optional[List[Section]]: """Return the mapped memory sections. If the current arch has its maps method defined, then defer to that to generated maps, otherwise, try to figure it out from procfs, then info sections, then monitor info @@ -10472,12 +10476,12 @@ def _parse_maps(cls) -> List[Section]: return list(gef.arch.maps()) try: - return list(cls.parse_procfs_maps()) + return list(cls.parse_gdb_info_proc_maps()) except: pass try: - return list(cls.parse_gdb_info_sections()) + return list(cls.parse_procfs_maps()) except: pass @@ -10486,7 +10490,6 @@ def _parse_maps(cls) -> List[Section]: except: pass - warn("Cannot get memory map") return None @staticmethod @@ -10496,6 +10499,7 @@ def parse_procfs_maps() -> Generator[Section, None, None]: if not procfs_mapfile: is_remote = gef.session.remote is not None raise FileNotFoundError(f"Missing {'remote ' if is_remote else ''}procfs map file") + with procfs_mapfile.open("r") as fd: for line in fd: line = line.strip() @@ -10521,30 +10525,44 @@ def parse_procfs_maps() -> Generator[Section, None, None]: return @staticmethod - def parse_gdb_info_sections() -> Generator[Section, None, None]: + def parse_gdb_info_proc_maps() -> Generator[Section, None, None]: """Get the memory mapping from GDB's command `maintenance info sections` (limited info).""" - stream = StringIO(gdb.execute("maintenance info sections", to_string=True)) - for line in stream: + if GDB_VERSION < (11, 0): + raise AttributeError("Disregarding old format") + + lines = (gdb.execute("info proc mappings", to_string=True) or "").splitlines() + + # The function assumes the following output format (as of GDB 11+) for `info proc mappings` + # ``` + # process 61789 + # Mapped address spaces: + # + # Start Addr End Addr Size Offset Perms objfile + # 0x555555554000 0x555555558000 0x4000 0x0 r--p /usr/bin/ls + # 0x555555558000 0x55555556c000 0x14000 0x4000 r-xp /usr/bin/ls + # [...] + # ``` + + if len(lines) < 5: + raise AttributeError + + # Format seems valid, iterate to generate sections + for line in lines[4:]: if not line: break - try: - parts = [x for x in line.split()] - addr_start, addr_end = [int(x, 16) for x in parts[1].split("->")] - off = int(parts[3][:-1], 16) - path = parts[4] - perm = Permission.from_info_sections(parts[5:]) - yield Section(page_start=addr_start, - page_end=addr_end, - offset=off, - permission=perm, - path=path) - - except IndexError: - continue - except ValueError: - continue + parts = [x.strip() for x in line.split()] + addr_start, addr_end, offset = [int(x, 16) for x in parts[0:3]] + perm = Permission.from_process_maps(parts[4]) + path = " ".join(parts[5:]) if len(parts) >= 5 else "" + yield Section( + page_start=addr_start, + page_end=addr_end, + offset=offset, + permission=perm, + path=path, + ) return @staticmethod @@ -10560,7 +10578,7 @@ def parse_monitor_info_mem() -> Generator[Section, None, None]: ranges, off, perms = line.split() off = int(off, 16) start, end = [int(s, 16) for s in ranges.split("-")] - except ValueError as e: + except ValueError: continue perm = Permission.from_monitor_info_mem(perms) @@ -10968,7 +10986,6 @@ def original_canary(self) -> Optional[Tuple[int, int]]: canary &= ~0xFF return canary, canary_location - @property def maps(self) -> Optional[pathlib.Path]: """Returns the Path to the procfs entry for the memory mapping.""" @@ -11063,7 +11080,12 @@ def connect(self, pid: int) -> bool: """Connect to remote target. If in extended mode, also attach to the given PID.""" # before anything, register our new hook to download files from the remote target dbg(f"[remote] Installing new objfile handlers") - gef_on_new_unhook(new_objfile_handler) + try: + gef_on_new_unhook(new_objfile_handler) + except SystemError: + # the default objfile handler might already have been removed, ignore failure + pass + gef_on_new_hook(self.remote_objfile_event_handler) # then attempt to connect @@ -11146,13 +11168,6 @@ def __setup_remote(self) -> bool: err(f"'{fpath}' could not be fetched on the remote system.") return False - # makeup a fake mem mapping in case we failed to retrieve it - maps = self.root / f"proc/{self.pid}/maps" - if not maps.exists(): - with maps.open("w") as fd: - fname = self.file.absolute() - mem_range = "00000000-ffffffff" if is_32bit() else "0000000000000000-ffffffffffffffff" - fd.write(f"{mem_range} rwxp 00000000 00:00 0 {fname}\n") return True def remote_objfile_event_handler(self, evt: "gdb.NewObjFileEvent") -> None: diff --git a/tests/api/gef_memory.py b/tests/api/gef_memory.py new file mode 100644 index 000000000..be74ea44e --- /dev/null +++ b/tests/api/gef_memory.py @@ -0,0 +1,147 @@ +""" +`gef.session` test module. +""" + +import pathlib +import random +import pytest + +from tests.base import RemoteGefUnitTestGeneric + +from tests.utils import ( + debug_target, + gdbserver_session, + qemuuser_session, + GDBSERVER_DEFAULT_HOST, +) + + +class GefMemoryApi(RemoteGefUnitTestGeneric): + """`gef.memory` test module.""" + + def setUp(self) -> None: + self._target = debug_target("default") + return super().setUp() + + def test_api_gef_memory_only_running(self): + gdb, gef = self._gdb, self._gef + + with pytest.raises(RuntimeError): + assert gef.memory.maps is None + + gdb.execute("start") + assert gef.memory.maps is not None + + def test_api_gef_memory_parse_info_proc_maps_expected_format(self): + if self.gdb_version < (11, 0): + pytest.skip(f"Skipping test for version {self.gdb_version} (min 10.0)") + + gdb, root = self._gdb, self._conn.root + gdb.execute("start") + + # Check output format + lines = (gdb.execute("info proc mappings", to_string=True) or "").splitlines() + assert len(lines) >= 5 + assert all(map(lambda x: isinstance(x, str), lines)) + for line in lines[4:]: + parts = [x.strip() for x in line.split()] + start_addr = int(parts[0], 16) + end_addr = int(parts[1], 16) + size = int(parts[2], 16) + int(parts[3], 16) + assert end_addr == start_addr + size + assert len(parts[4]) == 4, f"Expected permission string, got {parts[4]}" + Permission = root.eval("Permission") + Permission.from_process_maps(parts[4]) + + # optional objfile + if len(parts) == 5: + continue + + objfile = " ".join(parts[5:]).strip() + if objfile.startswith("/"): + assert pathlib.Path(objfile).exists() + + def test_api_gef_memory_parse_info_proc_maps(self): + gdb, gef, root = self._gdb, self._gef, self._conn.root + gdb.execute("start") + + Section = root.eval("Section") + + if self.gdb_version < (11, 0): + # expect an exception + with pytest.raises(AttributeError): + next(gef.memory.parse_gdb_info_proc_maps()) + + else: + for section in gef.memory.parse_gdb_info_proc_maps(): + assert isinstance(section, Section) + + def test_func_parse_permissions(self): + root = self._conn.root + expected_values = [ + ( + "Permission.from_info_sections('ALLOC LOAD READONLY CODE HAS_CONTENTS')", + "r-x", + ), + ("Permission.from_process_maps('r--')", "r--"), + ("Permission.from_monitor_info_mem('-r-')", "r--"), + ("Permission.from_info_mem('rw')", "rw-"), + ] + for cmd, expected in expected_values: + assert str(root.eval(cmd)) == expected + + def test_func_parse_maps_local_procfs(self): + root, gdb, gef = self._conn.root, self._gdb, self._gef + + with pytest.raises(FileNotFoundError): + root.eval("list(GefMemoryManager.parse_procfs_maps())") + + gdb.execute("start") + + sections = root.eval("list(GefMemoryManager.parse_procfs_maps())") + for section in sections: + assert section.page_start & ~0xFFF + assert section.page_end & ~0xFFF + + # + # The parse maps function should automatically get called when we start + # up, and we should be able to view the maps via the `gef.memory.maps` + # property. So check the alias `gef.memory.maps` + # However, since `gef.memory.maps` has more info, use it as source of + # truth + # + assert section in gef.memory.maps + + @pytest.mark.slow + def test_func_parse_maps_remote_gdbserver(self): + gef, gdb = self._gef, self._gdb + # When in a gef-remote session `parse_gdb_info_proc_maps` should work to + # query the memory maps + while True: + port = random.randint(1025, 65535) + if port != self._port: + break + + with pytest.raises(Exception): + gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") + + with gdbserver_session(port=port) as _: + gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") + sections = gef.memory.maps + assert len(sections) > 0 + + def test_func_parse_maps_remote_qemu(self): + gdb, gef = self._gdb, self._gef + # When in a gef-remote qemu-user session `parse_gdb_info_proc_maps` + # should work to query the memory maps + while True: + port = random.randint(1025, 65535) + if port != self._port: + break + + with qemuuser_session(port=port) as _: + cmd = f"gef-remote --qemu-user --qemu-binary {self._target} {GDBSERVER_DEFAULT_HOST} {port}" + gdb.execute(cmd) + sections = gef.memory.maps + assert len(sections) > 0 diff --git a/tests/api/misc.py b/tests/api/misc.py index ebc4abacf..5e47e2d4b 100644 --- a/tests/api/misc.py +++ b/tests/api/misc.py @@ -4,15 +4,12 @@ import pathlib import pytest -import random + from tests.base import RemoteGefUnitTestGeneric from tests.utils import ( debug_target, - gdbserver_session, - qemuuser_session, - GDBSERVER_DEFAULT_HOST, ) @@ -44,78 +41,6 @@ def test_func_parse_address(self): with pytest.raises(Exception): root.eval("parse_address('meh')") - def test_func_parse_permissions(self): - root = self._conn.root - expected_values = [ - ( - "Permission.from_info_sections('ALLOC LOAD READONLY CODE HAS_CONTENTS')", - "r-x", - ), - ("Permission.from_process_maps('r--')", "r--"), - ("Permission.from_monitor_info_mem('-r-')", "r--"), - ("Permission.from_info_mem('rw')", "rw-"), - ] - for cmd, expected in expected_values: - assert str(root.eval(cmd)) == expected - - def test_func_parse_maps_local_procfs(self): - root, gdb, gef = self._conn.root, self._gdb, self._gef - - with pytest.raises(FileNotFoundError): - root.eval("list(GefMemoryManager.parse_procfs_maps())") - - gdb.execute("start") - - sections = root.eval("list(GefMemoryManager.parse_procfs_maps())") - for section in sections: - assert section.page_start & ~0xFFF - assert section.page_end & ~0xFFF - - # The parse maps function should automatically get called when we start - # up, and we should be able to view the maps via the `gef.memory.maps` - # property. So check the alias - assert gef.memory.maps == sections - - def test_func_parse_maps_local_info_section(self): - root, gdb = self._conn.root, self._gdb - gdb.execute("start") - - sections = root.eval("list(GefMemoryManager.parse_gdb_info_sections())") - assert len(sections) > 0 - - @pytest.mark.slow - def test_func_parse_maps_remote_gdbserver(self): - root, gdb = self._conn.root, self._gdb - # When in a gef-remote session `parse_gdb_info_sections` should work to - # query the memory maps - while True: - port = random.randint(1025, 65535) - if port != self._port: - break - - with pytest.raises(Exception): - gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") - - with gdbserver_session(port=port) as _: - gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") - sections = root.eval("list(GefMemoryManager.parse_gdb_info_sections())") - assert len(sections) > 0 - - def test_func_parse_maps_remote_qemu(self): - root, gdb = self._conn.root, self._gdb - # When in a gef-remote qemu-user session `parse_gdb_info_sections` - # should work to query the memory maps - while True: - port = random.randint(1025, 65535) - if port != self._port: - break - - with qemuuser_session(port=port) as _: - cmd = f"gef-remote --qemu-user --qemu-binary {self._target} {GDBSERVER_DEFAULT_HOST} {port}" - gdb.execute(cmd) - sections = root.eval("list(GefMemoryManager.parse_gdb_info_sections())") - assert len(sections) > 0 - def test_func_show_last_exception(self): gdb = self._gdb gdb.execute("start") diff --git a/tests/base.py b/tests/base.py index 3bf4556a3..e284dd0da 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1,9 +1,11 @@ import os import pathlib import random +import re import subprocess import tempfile import time +from typing import Tuple import unittest import rpyc @@ -26,7 +28,6 @@ class RemoteGefUnitTestGeneric(unittest.TestCase): """ def setUp(self) -> None: - attempt = RPYC_MAX_REMOTE_CONNECTION_ATTEMPTS while True: try: @@ -106,3 +107,9 @@ def tearDown(self) -> None: self._conn.close() self._process.terminate() return super().tearDown() + + @property + def gdb_version(self) -> Tuple[int, int]: + res = [int(d) for d in re.search(r"(\d+)\D(\d+)", self._gdb.VERSION).groups()] + assert len(res) >= 2 + return tuple(res) diff --git a/tests/utils.py b/tests/utils.py index fc35c77b1..0254ed080 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -4,18 +4,28 @@ import contextlib import enum +import logging import os import pathlib import platform -import re import struct import subprocess import tempfile import time + from typing import Iterable, List, Optional, Union from urllib.request import urlopen +def which(program: str) -> pathlib.Path: + for path in os.environ["PATH"].split(os.pathsep): + dirname = pathlib.Path(path) + fpath = dirname / program + if os.access(fpath, os.X_OK): + return fpath + raise FileNotFoundError(f"Missing file `{program}`") + + TMPDIR = pathlib.Path(tempfile.gettempdir()) ARCH = (os.getenv("GEF_CI_ARCH") or platform.machine()).lower() BIN_SH = pathlib.Path("/bin/sh") @@ -31,6 +41,11 @@ STRIP_ANSI_DEFAULT = True GDBSERVER_DEFAULT_HOST = "localhost" GDBSERVER_DEFAULT_PORT = 1234 +GDBSERVER_BINARY = which("gdbserver") +assert GDBSERVER_BINARY.exists() + +QEMU_USER_X64_BINARY = which("qemu-x86_64") +assert QEMU_USER_X64_BINARY.exists() GEF_RIGHT_ARROW = " → " @@ -93,11 +108,9 @@ def start_gdbserver( Returns: subprocess.Popen: a Popen object for the gdbserver process. """ - return subprocess.Popen( - ["gdbserver", f"{host}:{port}", exe], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) + cmd = [GDBSERVER_BINARY, f"{host}:{port}", exe] + logging.debug(f"Starting {cmd}") + return subprocess.Popen(cmd) def stop_gdbserver(gdbserver: subprocess.Popen) -> None: @@ -131,7 +144,7 @@ def start_qemuuser( port: int = GDBSERVER_DEFAULT_PORT, ) -> subprocess.Popen: return subprocess.Popen( - ["qemu-x86_64", "-g", str(port), exe], + [QEMU_USER_X64_BINARY, "-g", str(port), exe], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, )