From 0d4c1450c311802cfd02e60c54edd2851e074d49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Thu, 20 Feb 2025 11:21:42 +0100 Subject: [PATCH] feat: replaced binwalk with unblob as carver and also updated patool deps und tidied up unpacker installation --- fact_extractor/install.py | 2 +- fact_extractor/install/unpacker.py | 271 ++++++++---------- .../generic_carver/code/generic_carver.py | 225 ++++++--------- .../test/data/carving_test_file | Bin 0 -> 626 bytes .../generic_carver/test/data/fake_xz.bin | Bin 130 -> 0 bytes .../test/test_plugin_generic_carver.py | 52 ++++ .../test_plugin_generic_carver_binwalk.py | 71 ----- requirements-unpackers.txt | 10 +- 8 files changed, 263 insertions(+), 368 deletions(-) create mode 100644 fact_extractor/plugins/unpacking/generic_carver/test/data/carving_test_file delete mode 100644 fact_extractor/plugins/unpacking/generic_carver/test/data/fake_xz.bin create mode 100644 fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver.py delete mode 100644 fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py diff --git a/fact_extractor/install.py b/fact_extractor/install.py index fff7927e..701a1c6d 100755 --- a/fact_extractor/install.py +++ b/fact_extractor/install.py @@ -106,7 +106,7 @@ def main(): with OperateInDirectory(installation_directory): common(distribution) - unpacker(distribution) + unpacker() logging.info('installation complete') diff --git a/fact_extractor/install/unpacker.py b/fact_extractor/install/unpacker.py index 4ed09185..d0c80d87 100644 --- a/fact_extractor/install/unpacker.py +++ b/fact_extractor/install/unpacker.py @@ -22,151 +22,92 @@ BIN_DIR = Path(__file__).parent.parent / 'bin' -DEPENDENCIES = { - # Ubuntu - 'bionic': { - 'apt': [ - # binwalk - 'libqt4-opengl', - 'python3-pyqt4', - 'python3-pyqt4.qtopengl', - 'libcapstone3', - # patool and unpacking backends - 'openjdk-8-jdk', - ] - }, - 'focal': { - 'apt': [ - # binwalk - 'libqt5opengl5', - 'python3-pyqt5', - 'python3-pyqt5.qtopengl', - 'libcapstone3', - # patool and unpacking backends - 'openjdk-16-jdk', - ] - }, - 'jammy': { - 'apt': [ - # binwalk - 'libqt5opengl5', - 'python3-pyqt5', - 'python3-pyqt5.qtopengl', - 'libcapstone4', - # patool and unpacking backends - 'openjdk-19-jdk', - ] - }, - # Debian - 'buster': { - 'apt': [ - # binwalk - 'libqt4-opengl', - 'python3-pyqt4', - 'python3-pyqt4.qtopengl', - 'libcapstone3', - # patool and unpacking backends - 'openjdk-8-jdk', - # freetz - ] - }, - 'bullseye': { - 'apt': [ - # binwalk - 'libqt5opengl5', - 'python3-pyqt5', - 'python3-pyqt5.qtopengl', - 'libcapstone3', - # patool and unpacking backends - 'openjdk-14-jdk', - ] - }, +APT_DEPENDENCIES = { # Packages common to all platforms - 'common': { - 'apt': [ - 'libjpeg-dev', - 'liblzma-dev', - 'liblzo2-dev', - 'zlib1g-dev', - 'unzip', - 'libffi-dev', - 'libfuzzy-dev', - 'fakeroot', - 'python3-opengl', - # binwalk - 'mtd-utils', - 'gzip', - 'bzip2', - 'tar', - 'arj', - 'lhasa', - 'cabextract', - 'cramfsswap', - 'squashfs-tools', - 'liblzma-dev', - 'liblzo2-dev', - 'xvfb', - 'libcapstone-dev', - # patool - 'arj', - 'cabextract', - 'cpio', - 'flac', - 'gzip', - 'lhasa', - 'libchm-dev', - 'lrzip', - 'lzip', - 'lzop', - 'ncompress', - 'nomarch', - 'rpm2cpio', - 'rzip', - 'sharutils', - 'unace', - 'unadf', - 'unalz', - 'unar', - 'unrar', - 'xdms', - 'zpaq', - # Freetz - 'autoconf', - 'automake', - 'bison', - 'flex', - 'g++', - 'gawk', - 'gcc', - 'gettext', - 'file', - 'libacl1-dev', - 'libcap-dev', - 'libncurses5-dev', - 'libsqlite3-dev', - 'libtool-bin', - 'libzstd-dev', - 'make', - 'pkg-config', - 'subversion', - 'unzip', - 'wget', - # android sparse image - 'simg2img', - # 7z - 'yasm', - ], - 'github': [ - ( - 'rampageX/firmware-mod-kit', - [ - '(cd src && make untrx && make -C tpl-tool/src && make -C yaffs2utils)', - 'cp src/untrx src/yaffs2utils/unyaffs2 src/tpl-tool/src/tpl-tool ../../bin/', - ], - ), - ], - }, + 'libjpeg-dev', + 'liblzma-dev', + 'liblzo2-dev', + 'zlib1g-dev', + 'unzip', + 'libffi-dev', + 'libfuzzy-dev', + 'fakeroot', + 'python3-opengl', + # patool + 'arc', + 'archmage', + 'arj', + 'binutils', + 'bzip2', + 'cabextract', + 'clzip', + 'cpio', + 'flac', + 'genisoimage', + 'lbzip2', + 'lhasa', + 'libarchive-tools', + 'lrzip', + 'lz4', + 'lzip', + 'lzop', + 'ncompress', + 'nomarch', + # FixMe: p7zip-full installed in 7z plugin + 'pbzip2', + 'pdlzip', + 'plzip', + 'rpm2cpio', + 'rzip', + 'sharutils', + 'tar', + 'unace', + 'unalz', + 'unar', + 'xdms', + 'zip', + 'zopfli', + 'zpaq', + 'zstd', + # Freetz + 'autoconf', + 'automake', + 'bison', + 'flex', + 'g++', + 'gawk', + 'gcc', + 'gettext', + 'file', + 'libacl1-dev', + 'libcap-dev', + 'libncurses5-dev', + 'libsqlite3-dev', + 'libtool-bin', + 'libzstd-dev', + 'make', + 'pkg-config', + 'subversion', + 'wget', + # android sparse image + 'simg2img', + # 7z + 'yasm', + # unblob + 'android-sdk-libsparse-utils', + 'e2fsprogs', + 'libhyperscan-dev', + 'lziprecover', } +GITHUB_DEPENDENCIES = [ + ( + 'rampageX/firmware-mod-kit', + [ + '(cd src && make untrx && make -C tpl-tool/src && make -C yaffs2utils)', + 'cp src/untrx src/yaffs2utils/unyaffs2 src/tpl-tool/src/tpl-tool ../../bin/', + ], + ), +] PIP_DEPENDENCY_FILE = Path(__file__).parent.parent.parent / 'requirements-unpackers.txt' if platform.machine() == 'x86_64': EXTERNAL_DEB_DEPS = [ @@ -200,16 +141,19 @@ ] -def install_dependencies(dependencies): - apt = dependencies.get('apt', []) - github = dependencies.get('github', []) - apt_install_packages(*apt) - pip_install_packages(*load_requirements_file(PIP_DEPENDENCY_FILE)) - for repo in github: +def check_mod_kit_installed() -> bool: + return all((Path(__file__).parent.parent / 'bin' / tool).exists() for tool in ['tpl-tool', 'untrx', 'unyaffs2']) + + +def install_github_dependencies(): + for repo in GITHUB_DEPENDENCIES: + if repo[0].endswith('firmware-mod-kit') and check_mod_kit_installed(): + logging.info('Skipping firmware-mod-kit since it is already installed') + continue install_github_project(*repo) -def main(distribution): +def main(): # removes due to compatibility reasons try: apt_remove_packages('python-lzma') @@ -217,8 +161,9 @@ def main(distribution): logging.debug('python-lzma not removed because present already') # install dependencies - install_dependencies(DEPENDENCIES['common']) - install_dependencies(DEPENDENCIES[distribution]) + apt_install_packages(*APT_DEPENDENCIES) + pip_install_packages(*load_requirements_file(PIP_DEPENDENCY_FILE)) + install_github_dependencies() # installing freetz if platform.machine() == 'x86_64': @@ -252,7 +197,7 @@ def _edit_sudoers(): '/bin/chown', ) ) - Path('/tmp/fact_overrides').write_text(f'{sudoers_content}\n') # pylint: disable=unspecified-encoding + Path('/tmp/fact_overrides').write_text(f'{sudoers_content}\n', encoding='utf-8') _, chown_code = execute_shell_command_get_return_code('sudo chown root:root /tmp/fact_overrides') _, mv_code = execute_shell_command_get_return_code('sudo mv /tmp/fact_overrides /etc/sudoers.d/fact_overrides') if not chown_code == mv_code == 0: @@ -278,7 +223,27 @@ def _sha256_hash_file(file_path: Path) -> str: return hashlib.sha256(file_path.read_bytes()).hexdigest() +def _freetz_is_already_installed(): + return all( + (Path(__file__).parent.parent / 'bin' / tool).exists() + for tool in [ + 'find-squashfs', + 'unpack-kernel', + 'freetz_bin_functions', + 'unlzma', + 'sfk', + 'unsquashfs4-avm-be', + 'unsquashfs4-avm-le', + 'unsquashfs3-multi', + ] + ) + + def _install_freetz(): + if _freetz_is_already_installed(): + logging.info('Skipping FREETZ installation (already installed)') + return + logging.info('Installing FREETZ') current_user = getuser() freetz_build_config = Path(__file__).parent / 'freetz.config' diff --git a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py index 2d27322e..5a808e46 100644 --- a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py +++ b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py @@ -5,145 +5,100 @@ from __future__ import annotations import logging -import re -import shutil +import traceback +from itertools import chain from pathlib import Path - -from common_helper_process import execute_shell_command - -from helperFunctions import magic +from typing import Iterable + +import structlog +from common_helper_unpacking_classifier import avg_entropy +from unblob.extractor import carve_unknown_chunk, carve_valid_chunk +from unblob.file_utils import File +from unblob.finder import search_chunks +from unblob.handlers import BUILTIN_HANDLERS +from unblob.handlers.compression.zlib import ZlibHandler +from unblob.models import Chunk, HexString, PaddingChunk, TaskResult, UnknownChunk +from unblob.plugins import hookimpl +from unblob.processing import Task, calculate_unknown_chunks, remove_inner_chunks NAME = 'generic_carver' MIME_PATTERNS = ['generic/carver'] -VERSION = '0.8' - -TAR_MAGIC = b'ustar' -BZ2_EOF_MAGIC = [ # the magic string is only aligned to half bytes -> two possible strings - b'\x17\x72\x45\x38\x50\x90', - b'\x77\x24\x53\x85\x09', -] -REAL_SIZE_REGEX = re.compile(r'Physical Size = (\d+)') - - -def unpack_function(file_path, tmp_dir): - """ - file_path specifies the input file. - tmp_dir should be used to store the extracted files. - """ - - logging.debug(f'File Type unknown: execute binwalk on {file_path}') - output = execute_shell_command(f'binwalk --extract --carve --signature --directory {tmp_dir} {file_path}') - - drop_underscore_directory(tmp_dir) - return {'output': output, 'filter_log': ArchivesFilter(tmp_dir).remove_false_positive_archives()} - - -class ArchivesFilter: - def __init__(self, unpack_directory): - self.unpack_directory = Path(unpack_directory) - self.screening_logs = [] - - def remove_false_positive_archives(self) -> str: - for file_path in self.unpack_directory.glob('**/*'): - if not file_path.is_file(): - continue - file_type = magic.from_file(file_path, mime=True) - - if file_type == 'application/x-tar' or self._is_possible_tar(file_type, file_path): - self._remove_invalid_archives(file_path, 'tar -tvf {}', 'does not look like a tar archive') - - elif file_type == 'application/x-xz': - self._remove_invalid_archives(file_path, 'xz -c -d {} | wc -c') - - elif file_type == 'application/gzip': - self._remove_invalid_archives(file_path, 'gzip -c -d {} | wc -c') - - elif file_path.suffix == '7z' or file_type in [ - 'application/x-7z-compressed', - 'application/x-lzma', - 'application/zip', - 'application/zlib', - ]: - self._remove_invalid_archives(file_path, '7z l {}', 'ERROR') - - if file_path.is_file(): - self._remove_trailing_data(file_type, file_path) - - return '\n'.join(self.screening_logs) - - @staticmethod - def _is_possible_tar(file_type: str, file_path: Path) -> bool: - # broken tar archives may be identified as octet-stream by newer versions of libmagic - if file_type == 'application/octet-stream': - with file_path.open(mode='rb') as fp: - fp.seek(0x101) - return fp.read(5) == TAR_MAGIC - return False - - def _remove_invalid_archives(self, file_path: Path, command, search_key=None): - output = execute_shell_command(command.format(file_path)) - - if search_key and search_key in output.replace('\n ', '') or not search_key and _output_is_empty(output): - self._remove_file(file_path) - - def _remove_file(self, file_path): - file_path.unlink() - self.screening_logs.append(f'{file_path.name} was removed (invalid archive)') - - def _remove_trailing_data(self, file_type: str, file_path: Path): - trailing_data_index = None - - if file_type in ['application/zip', 'application/zlib']: - trailing_data_index = _find_trailing_data_index_zip(file_path) - - elif file_type == 'application/x-bzip2': - trailing_data_index = _find_trailing_data_index_bz2(file_path) - - if trailing_data_index: - self._resize_file(trailing_data_index, file_path) - - def _resize_file(self, actual_size: int, file_path: Path): - with file_path.open('rb') as fp: - actual_content = fp.read(actual_size) - file_path.write_bytes(actual_content) - self.screening_logs.append(f'Removed trailing data at the end of {file_path.name}') - - -def _output_is_empty(output): - return int((output.split())[-1]) == 0 - - -def _find_trailing_data_index_zip(file_path: Path) -> int | None: - """Archives carved by binwalk often have trailing data at the end. 7z can determine the actual file size.""" - output = execute_shell_command(f'7z l {file_path}') - if 'There are data after the end of archive' in output: - match = REAL_SIZE_REGEX.search(output) - if match: - return int(match.groups()[0]) - return None - - -def _find_trailing_data_index_bz2(file_path: Path) -> int | None: - output = execute_shell_command(f'bzip2 -t {file_path}') - if 'trailing garbage' in output: - file_content = file_path.read_bytes() - matches = sorted(index for magic in BZ2_EOF_MAGIC if (index := file_content.find(magic)) != -1) - # there may be two matches, but we want the first one (but also not -1 == no match) - if matches: - # 10 is magic string + CRC 32 checksum + padding (see https://en.wikipedia.org/wiki/Bzip2#File_format) - return matches[0] + 10 - return None - - -def drop_underscore_directory(tmp_dir): - extracted_contents = list(Path(tmp_dir).iterdir()) - if not extracted_contents: - return - if len(extracted_contents) != 1 or not extracted_contents[0].name.endswith('.extracted'): - return - for result in extracted_contents[0].iterdir(): - shutil.move(str(result), str(result.parent.parent)) - shutil.rmtree(str(extracted_contents[0])) +VERSION = '1.0.0' + +MIN_FILE_ENTROPY = 0.01 + +# deactivate internal logger of unblob because it can slow down searching chunks +structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.CRITICAL)) + + +# register custom zlib handler to allow carving zlib chunks from inside files +@hookimpl +def unblob_register_handlers(): + yield from [ZlibCarvingHandler] + + +class ZlibCarvingHandler(ZlibHandler): + NAME = 'zlib_carver' + + PATTERNS = [ # noqa: RUF012 + HexString('78 01'), # low compression + HexString('78 9c'), # default compression + HexString('78 da'), # best compression + HexString('78 5e'), # compressed + ] + + +def unpack_function(file_path: str, tmp_dir: str) -> dict: + extraction_dir = Path(tmp_dir) + chunks = [] + filter_report = '' + path = Path(file_path) + + try: + with File.from_path(path) as file: + for chunk in _find_chunks(path, file): + if isinstance(chunk, PaddingChunk): + continue + if isinstance(chunk, UnknownChunk): + if _has_low_entropy(file, chunk): + filter_report += ( + f'removed chunk {chunk.start_offset}-{chunk.end_offset} (reason: low entropy)\n' + ) + continue + carve_unknown_chunk(extraction_dir, file, chunk) + else: + carve_valid_chunk(extraction_dir, file, chunk) + chunks.append(chunk.as_report(None).asdict()) + + report = _create_report(chunks) if chunks else 'No valid chunks found.' + if filter_report: + report += f'\nFiltered chunks:\n{filter_report}' + except Exception as error: + report = f'Error {error} during unblob extraction:\n{traceback.format_exc()}' + return {'output': report} + + +def _find_chunks(file_path: Path, file: File) -> Iterable[Chunk]: + task = Task(path=file_path, depth=0, blob_id='') + known_chunks = remove_inner_chunks(search_chunks(file, file.size(), BUILTIN_HANDLERS, TaskResult(task))) + unknown_chunks = calculate_unknown_chunks(known_chunks, file.size()) + yield from chain(known_chunks, unknown_chunks) + + +def _create_report(chunk_list: list[dict]) -> str: + report = ['Extracted chunks:'] + for chunk in sorted(chunk_list, key=lambda c: c['start_offset']): + chunk_type = chunk.get('handler_name', 'unknown') + report.append( + f'start: {chunk["start_offset"]}, end: {chunk["end_offset"]}, size: {chunk["size"]}, type: {chunk_type}' + ) + return '\n'.join(report) + + +def _has_low_entropy(file: File, chunk: UnknownChunk) -> bool: + file.seek(chunk.start_offset) + content = file.read(chunk.size) + return avg_entropy(content) < MIN_FILE_ENTROPY # ----> Do not edit below this line <---- diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/carving_test_file b/fact_extractor/plugins/unpacking/generic_carver/test/data/carving_test_file new file mode 100644 index 0000000000000000000000000000000000000000..ede93aefc97e619807a7e471875e26973785df81 GIT binary patch literal 626 zcmcc9`rtxFLfG;PyLp43X0PVTa$D%9z4P&b)$7f8Sp3~h8ihD|`j}Q+xoh}hsnAIs zmC2@K=Jp8o!{?x3V_%^ z%*-Iekd~hx8p6rI{C~mxG{%MV(@HD285mh!Ff%ZK37|ArQeqKTNosM4p^-6HfHxzP z95XKCBtW(>Faq(GMi2{ZI4i_(G=l@YS=m5J7=h3eNc(~~0}jlq_N-TzVPLt~(0)e* z>Y5;!tBU`*Ff#C?hX^pV*ccc&MHtw*fqqd`WMtqGVBp|l1O+q$11qB>4@^IZ7UX0| zW5@?W1_ntc1_lu>MuvvWGcLdH++q}EV`NZh00o@2`I1}dcW(132)Mh(3n=~U(?8^N zu(!-e>si2|Qk_~>o|cl|m!9zKsE{gM{;l%7N$R%q7nT{F`SNtG*xYAq_m%(v)%TSR literal 0 HcmV?d00001 diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_xz.bin b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_xz.bin deleted file mode 100644 index 93cea4cbd468e3483a6efc40bd3c4d4f2ea1dd73..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 130 zcmV-|0Db>frMWt%uVxpffT=)Qirs|qb(Ekst~P`y-U+wF@tDPu;Aw{>vl})&?e|=_ zl+4@_Rck#((|&<@Om6n;eAWb~YYn%v3d`zdax`sNV9RR6O&96Q FilterTest: - with TemporaryDirectory() as temp_dir: - test_file = Path(temp_dir) / filename - source_file = TEST_DATA_DIR / filename - shutil.copyfile(source_file, test_file) - arch_filter = ArchivesFilter(temp_dir) - yield FilterTest(test_file, source_file, arch_filter) - - -@pytest.mark.parametrize('filename', ['fake_zip.zip', 'fake_tar.tar', 'fake_7z.7z', 'fake_xz.xz', 'fake_gz.gz']) -def test_remove_false_positives(filename): - with filter_test_setup(filename) as setup: - setup.filter.remove_false_positive_archives() - assert setup.test_file.is_file() is False - - -@pytest.mark.parametrize('filename', ['trailing_data.zip', 'trailing_data.bz2']) -def test_remove_trailing_data(filename): - with filter_test_setup(filename) as setup: - setup.filter.remove_false_positive_archives() - assert setup.filter.screening_logs == [f'Removed trailing data at the end of {filename}'] - assert setup.test_file.stat().st_size < setup.source_file.stat().st_size diff --git a/requirements-unpackers.txt b/requirements-unpackers.txt index 44bd41b3..8056fe40 100644 --- a/requirements-unpackers.txt +++ b/requirements-unpackers.txt @@ -2,18 +2,11 @@ pluginbase~=1.0.1 git+https://github.com/fkie-cad/common_helper_unpacking_classifier.git python-magic -patool~=2.2.0 +patool~=3.1.3 # jffs2: jefferson + deps git+https://github.com/sviehb/jefferson.git@v0.4.1 cstruct==2.1 python-lzo==1.14 -# generic_carver: binwalk -# ToDo: pin to fork (?) -git+https://github.com/ReFirmLabs/binwalk@v2.3.2 -pyqtgraph~=0.13.4 -capstone~=5.0.1 -numpy~=1.26.4 -scipy~=1.13.0 # ubi ubi-reader~=0.8.9 # dji / dlink_shrs @@ -39,4 +32,5 @@ uefi-firmware~=1.11 pylibfdt ~= 1.7.1 # xiaomi hdr structlog~=25.1.0 +# unblob unblob~=25.1.8