diff --git a/grizzly/common/status.py b/grizzly/common/status.py index de50cef9..28db5268 100644 --- a/grizzly/common/status.py +++ b/grizzly/common/status.py @@ -874,7 +874,7 @@ def __init__( # TODO: make SigInfo dataclass? self.signature_info: Dict[str, Union[bool, str]] = {} self.successes = 0 - self.current_strategy_idx = None + self.current_strategy_idx: Optional[int] = None self._testcase_size_cb = testcase_size_cb self.crash_id = crash_id self.finished_steps: List[ReductionStep] = [] @@ -886,7 +886,7 @@ def __init__( self.tool = tool self._current_size: Optional[int] = None # this holds results from Reporter.submit() - self.last_reports = [] + self.last_reports: List[str] = [] # prepare database if self._db_file: diff --git a/grizzly/reduce/args.py b/grizzly/reduce/args.py index 8a1da391..c5f991c3 100644 --- a/grizzly/reduce/args.py +++ b/grizzly/reduce/args.py @@ -3,6 +3,7 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. """CLI argument parsing for Grizzly reduction. """ +from argparse import Namespace from logging import getLogger from pathlib import Path @@ -19,7 +20,7 @@ class ReduceCommonArgs(ReplayCommonArgs): Takes all arguments defined for `grizzly.replay`, and a few specific to reduction. """ - def __init__(self): + def __init__(self) -> None: """Initialize argument parser.""" super().__init__() @@ -52,11 +53,11 @@ def __init__(self): help="One or more strategies. (default: %(default)s)", ) - def sanity_check(self, args): + def sanity_check(self, args: Namespace) -> None: """Sanity check reducer args. Arguments: - args (argparse.Namespace): Result from `parse_args()`. + args: Result from `parse_args()`. Raises: SystemExit: on error, `ArgumentParser.error()` is called, which will exit. @@ -86,7 +87,7 @@ def sanity_check(self, args): class ReduceArgs(ReduceCommonArgs): # NOTE: If updated changes may also need to be added to ReplayArgs - def __init__(self): + def __init__(self) -> None: super().__init__() self.parser.add_argument("input", type=Path, nargs="+", help=LOCAL_INPUT_HELP) @@ -97,7 +98,7 @@ def __init__(self): " automatically determined.", ) - def sanity_check(self, args): + def sanity_check(self, args: Namespace) -> None: super().sanity_check(args) for test in args.input: @@ -109,7 +110,7 @@ def sanity_check(self, args): class ReduceFuzzManagerIDArgs(ReduceCommonArgs): - def __init__(self): + def __init__(self) -> None: """Initialize argument parser.""" super().__init__() self.parser.add_argument("input", type=int, help="FuzzManager ID to reduce") @@ -135,7 +136,7 @@ def __init__(self): "0 == oldest, n-1 == most recent (default: run all testcases)", ) - def sanity_check(self, args): + def sanity_check(self, args: Namespace) -> None: super().sanity_check(args) if args.no_harness and len(args.test_index) > 1: @@ -145,7 +146,7 @@ def sanity_check(self, args): class ReduceFuzzManagerIDQualityArgs(ReduceFuzzManagerIDArgs): - def __init__(self): + def __init__(self) -> None: """Initialize argument parser.""" super().__init__() self.parser.add_argument( diff --git a/grizzly/reduce/conftest.py b/grizzly/reduce/conftest.py index 00011b91..23763abc 100644 --- a/grizzly/reduce/conftest.py +++ b/grizzly/reduce/conftest.py @@ -16,6 +16,6 @@ def reporter_sequential_strftime(mocker): prefix = mocker.patch("grizzly.common.report.strftime") def report_prefix(_): - return f"{prefix.call_count:0>4d}" + return f"{prefix.call_count:04d}" prefix.side_effect = report_prefix diff --git a/grizzly/reduce/core.py b/grizzly/reduce/core.py index eab6129d..4975cac0 100644 --- a/grizzly/reduce/core.py +++ b/grizzly/reduce/core.py @@ -4,12 +4,14 @@ """`ReduceManager` finds the smallest testcase(s) to reproduce an issue.""" import json import os +from argparse import Namespace from itertools import chain from locale import LC_ALL, setlocale from logging import getLogger from math import ceil, log from pathlib import Path from time import time +from typing import Any, Dict, List, Optional, Set, Tuple from FTB.Signatures.CrashInfo import CrashSignature @@ -25,7 +27,7 @@ ) from ..common.status import STATUS_DB_REDUCE, ReductionStatus from ..common.status_reporter import ReductionStatusReporter -from ..common.storage import TestCaseLoadFailure +from ..common.storage import TestCase, TestCaseLoadFailure from ..common.utils import ( CertificateBundle, ConfigError, @@ -33,8 +35,8 @@ configure_logging, time_limits, ) -from ..replay import ReplayManager -from ..target import Target, TargetLaunchError, TargetLaunchTimeout +from ..replay import ReplayManager, ReplayResult +from ..target import AssetManager, Target, TargetLaunchError, TargetLaunchTimeout from .exceptions import GrizzlyReduceBaseException, NotReproducible from .strategies import STRATEGIES @@ -50,13 +52,11 @@ class ReduceManager: that reproduces a given issue. Attributes: - ignore (list(str)): Classes of results to ignore (see `--ignore`). - server (sapphire.Sapphire): Server instance to serve testcases. - strategies (list(str)): List of strategies to use for reducing - testcases (in order). - target (grizzly.target.Target): Target instance to run testcases. - testcases (list(grizzly.common.storage.TestCase)): List of one or more Grizzly - testcases to reduce. + ignore: Classes of results to ignore (see `--ignore`). + server: Server instance to serve testcases. + strategies: List of strategies to use for reducing testcases (in order). + target: Target instance to run testcases. + testcases: List of one or more Grizzly testcases to reduce. """ ANALYSIS_ITERATIONS = 11 # number of iterations to analyze @@ -75,58 +75,54 @@ class ReduceManager: def __init__( self, - ignore, - server, - target, - testcases, - strategies, - log_path, - any_crash=False, - expect_hang=False, - idle_delay=0, - idle_threshold=0, - reducer_crash_id=None, - relaunch=1, - report_period=None, - report_to_fuzzmanager=False, - signature=None, - signature_desc=None, - static_timeout=False, - tool=None, - use_analysis=True, - use_harness=True, + ignore: Set[str], + server: Sapphire, + target: Target, + testcases: List[TestCase], + strategies: List[str], + log_path: Path, + any_crash: bool = False, + expect_hang: bool = False, + idle_delay: int = 0, + idle_threshold: int = 0, + reducer_crash_id: Optional[int] = None, + relaunch: int = 1, + report_period: Optional[int] = None, + report_to_fuzzmanager: bool = False, + signature: Optional[CrashSignature] = None, + signature_desc: Optional[str] = None, + static_timeout: bool = False, + tool: Optional[str] = None, + use_analysis: bool = True, + use_harness: bool = True, ): """Initialize reduction manager. Many arguments are common with `ReplayManager`. Args: - ignore (list(str)): Value for `self.ignore` attribute. - server (sapphire.Sapphire): Value for `self.server` attribute. - target (grizzly.target.Target): Value for `self.target` attribute. - testcases (list(grizzly.common.storage.TestCase)): - Value for `self.testcases` attribute. - strategies (list(str)): Value for `self.strategies` attribute. - log_path (Path or str): Path to save results when reporting to filesystem. - any_crash (bool): Accept any crash when reducing, not just those matching - the specified or first observed signature. - expect_hang (bool): Attempt to reduce a test that triggers a hang. - idle_delay (int): Number of seconds to wait before polling for idle. - idle_threshold (int): CPU usage threshold to mark the process as idle. - relaunch (int): Maximum number of iterations performed by Runner - before Target should be relaunched. - report_period (int or None): Periodically report best results for - long-running strategies. - report_to_fuzzmanager (bool): Report to FuzzManager rather than filesystem. - signature (FTB.Signatures.CrashInfo.CrashSignature or None): - Signature for accepting crashes. - signature_desc (str): Short description of the given signature. - static_timeout (bool): Use only specified timeouts (`--timeout` and - `--idle-delay`), even if testcase appears to need - less time. - tool (str or None): Override tool when reporting to FuzzManager. - use_analysis (bool): Analyse reliability of testcase before running each - reduction strategy. - use_harness (bool): Whether to allow use of harness when navigating - between testcases. + ignore: Value for `self.ignore` attribute. + server: Value for `self.server` attribute. + target: Value for `self.target` attribute. + testcases: Value for `self.testcases` attribute. + strategies: Value for `self.strategies` attribute. + log_path: Path to save results when reporting to filesystem. + any_crash: Accept any crash when reducing, not just those matching + the specified or first observed signature. + expect_hang: Attempt to reduce a test that triggers a hang. + idle_delay: Number of seconds to wait before polling for idle. + idle_threshold: CPU usage threshold to mark the process as idle. + relaunch: Maximum number of iterations performed by Runner before + Target should be relaunched. + report_period: Periodically report best results for long-running strategies. + report_to_fuzzmanager: Report to FuzzManager rather than filesystem. + signature: Signature for accepting crashes. + signature_desc: Short description of the given signature. + static_timeout: Use only specified timeouts (`--timeout` and + `--idle-delay`), even if testcase appears to need less time. + tool: Override tool when reporting to FuzzManager. + use_analysis: Analyse reliability of testcase before running each + reduction strategy. + use_harness: Whether to allow use of harness when navigating + between testcases. """ self.ignore = ignore self.server = server @@ -137,7 +133,7 @@ def __init__( self._expect_hang = expect_hang self._idle_delay = idle_delay self._idle_threshold = idle_threshold - self._log_path = Path(log_path) if isinstance(log_path, str) else log_path + self._log_path = log_path # these parameters may be overwritten during analysis, so keep a copy of them self._original_relaunch = relaunch self._original_use_harness = use_harness @@ -157,13 +153,13 @@ def __init__( self._use_analysis = use_analysis self._use_harness = use_harness - def __enter__(self): + def __enter__(self) -> "ReduceManager": return self - def __exit__(self, *exc): + def __exit__(self, *exc: Any) -> None: self.cleanup() - def cleanup(self): + def cleanup(self) -> None: """Remove temporary files from disk. Args: @@ -175,7 +171,7 @@ def cleanup(self): for test in self.testcases: test.cleanup() - def update_timeout(self, results): + def update_timeout(self, results: List[ReplayResult]) -> None: """Tune idle/server timeout values based on actual duration of expected results. Expected durations will be updated if the actual duration is much lower. @@ -187,9 +183,8 @@ def update_timeout(self, results): - Target is running under valgrind (`--valgrind`). Arguments: - results (grizzly.replay.ReplayResult): - Observed results. Any given expected results may affect the idle delay - and sapphire timeout. + results: Observed results. Any given expected results may affect the idle + delay and sapphire timeout. Returns: None @@ -209,9 +204,7 @@ def update_timeout(self, results): return durations = list( - chain.from_iterable( - result.durations for result in results if result.expected - ) + chain.from_iterable(x.durations for x in results if x.expected) ) if not durations: # no expected results @@ -222,7 +215,9 @@ def update_timeout(self, results): LOG.debug("Run time %r", run_time) new_idle_delay = max( self.IDLE_DELAY_MIN, - min(run_time * self.IDLE_DELAY_DURATION_MULTIPLIER, self._idle_delay), + min( + round(run_time * self.IDLE_DELAY_DURATION_MULTIPLIER), self._idle_delay + ), ) if new_idle_delay < self._idle_delay: LOG.info("Updating poll delay to: %r", new_idle_delay) @@ -231,17 +226,20 @@ def update_timeout(self, results): # in other words, decrease the timeout if this ran in less than half the timeout new_iter_timeout = max( self.ITER_TIMEOUT_MIN, - min(run_time * self.ITER_TIMEOUT_DURATION_MULTIPLIER, self.server.timeout), + min( + round(run_time * self.ITER_TIMEOUT_DURATION_MULTIPLIER), + self.server.timeout, + ), ) if new_iter_timeout < self.server.timeout: LOG.info("Updating max timeout to: %r", new_iter_timeout) self.server.timeout = new_iter_timeout - def _on_replay_iteration(self): + def _on_replay_iteration(self) -> None: self._status.iterations += 1 self._status.report() - def run_reliability_analysis(self): + def run_reliability_analysis(self) -> Tuple[int, int]: """Run several analysis passes of the current testcase to find `run` parameters. The number of repetitions and minimum number of crashes are calculated to @@ -251,10 +249,10 @@ def run_reliability_analysis(self): None Returns: - tuple(int, int): Values for `repeat` and `min_crashes` resulting from - analysis. + Values for `repeat` and `min_crashes` resulting from analysis. """ self._status.report(force=True) + harness_best = 0 harness_last_crashes = 0 harness_crashes = 0 non_harness_crashes = 0 @@ -327,14 +325,10 @@ def run_reliability_analysis(self): try: crashes = sum(x.count for x in results if x.expected) if crashes and not self._any_crash and self._signature_desc is None: - first_expected = next( - (report for report in results if report.expected), None - ) + first_expected = next((x for x in results if x.expected), None) + assert first_expected self._signature_desc = first_expected.report.short_signature - self.report( - [result for result in results if not result.expected], - testcases, - ) + self.report([x for x in results if not x.expected], testcases) if use_harness: if last_test_only: harness_last_crashes = crashes @@ -428,27 +422,33 @@ def run_reliability_analysis(self): ) return (repeat, min_crashes) - def testcase_size(self): + def testcase_size(self) -> int: """Calculate the current testcase size. Returns: - int: Current size of the testcase(s). + Current size of the testcase(s). """ return sum(tc.data_size for tc in self.testcases) - def run(self, repeat=1, launch_attempts=3, min_results=1, post_launch_delay=0): + def run( + self, + repeat: int = 1, + launch_attempts: int = 3, + min_results: int = 1, + post_launch_delay: int = 0, + ) -> Exit: """Run testcase reduction. Args: - repeat (int): Maximum number of times to run the TestCase. - launch_attempts (int): Number of attempts to launch the browser. - min_results (int): Minimum number of results needed before run can - be considered successful. - post_launch_delay (int): Time in seconds before continuing after the - browser is launched. + repeat: Maximum number of times to run the TestCase. + launch_attempts: Number of attempts to launch the browser. + min_results: Minimum number of results needed before run can be considered + successful. + post_launch_delay: Time in seconds before continuing after the browser + is launched. Returns: - int: One of the Exit enum values. + One of the Exit enum values. """ any_success = False sig_given = self._signature is not None @@ -478,12 +478,12 @@ def run(self, repeat=1, launch_attempts=3, min_results=1, post_launch_delay=0): self._status.run_params["relaunch"] = relaunch self._status.run_params["repeat"] = repeat - for strategy_no, strategy in enumerate(self.strategies, start=1): + for strategy_no, strategy_name in enumerate(self.strategies, start=1): self._status.current_strategy_idx = strategy_no LOG.info("") LOG.info( "Using strategy %s (%d/%d)", - strategy, + strategy_name, strategy_no, len(self.strategies), ) @@ -496,24 +496,23 @@ def run(self, repeat=1, launch_attempts=3, min_results=1, post_launch_delay=0): signature=self._signature, use_harness=self._use_harness, ) - strategy = STRATEGIES[strategy](self.testcases) + strategy = STRATEGIES[strategy_name](self.testcases) if last_tried is not None: strategy.update_tried(last_tried) last_tried = None strategy_last_report = time() strategy_stats = self._status.measure(strategy.name) - best_results = [] - other_results = {} + best_results: List[ReplayResult] = [] + other_results: Dict[str, Tuple[ReplayResult, List[TestCase]]] = {} try: with replay, strategy, strategy_stats: self._status.report(force=True) for reduction in strategy: keep_reduction = False - results = [] + results: List[ReplayResult] = [] try: - # reduction is a new list of testcases to be - # replayed + # reduction is a new list of testcases to be replayed results = replay.run( reduction, self.server.timeout, @@ -531,7 +530,7 @@ def run(self, repeat=1, launch_attempts=3, min_results=1, post_launch_delay=0): # get the first expected result (if any), # and update the strategy first_expected = next( - (report for report in results if report.expected), + (x for x in results if x.expected), None, ) success = first_expected is not None @@ -587,7 +586,7 @@ def run(self, repeat=1, launch_attempts=3, min_results=1, post_launch_delay=0): # only save the smallest testcase that has found # each result for result in results: - other_result_exists = bool( + other_result_exists = ( result.report.minor in other_results ) @@ -619,10 +618,7 @@ def run(self, repeat=1, launch_attempts=3, min_results=1, post_launch_delay=0): # as the other result other_results[result.report.minor] = ( result, - [ - testcase.clone() - for testcase in reduction - ], + [x.clone() for x in reduction], ) now = time() @@ -682,12 +678,12 @@ def run(self, repeat=1, launch_attempts=3, min_results=1, post_launch_delay=0): if self._report_to_fuzzmanager and self._status.last_reports: for crash_id in self._status.last_reports: LOG.info( - "Updating crash %d to %s (Q%d)", + "Updating crash %s to %s (Q%d)", crash_id, Quality.REDUCED.name, Quality.REDUCED, ) - CrashEntry(crash_id).testcase_quality = Quality.REDUCED.value + CrashEntry(int(crash_id)).testcase_quality = Quality.REDUCED.value # it's possible we made it this far without ever setting signature_desc. # this is only possible if --no-analysis is given @@ -708,51 +704,55 @@ def run(self, repeat=1, launch_attempts=3, min_results=1, post_launch_delay=0): return Exit.SUCCESS return Exit.FAILURE - def report(self, results, testcases, update_status=False): + def report( + self, + results: List[ReplayResult], + testcases: List[TestCase], + update_status: bool = False, + ) -> None: """Report results, either to FuzzManager or to filesystem. Arguments: - results (list(ReplayResult)): Results observed during reduction. - testcases (list(TestCase)): Testcases used to trigger results. - update_status (bool): Whether to update status "Latest Reports" + results: Results observed during reduction. + testcases: Testcases used to trigger results. + update_status: Whether to update status "Latest Reports" """ - new_reports = [] + new_reports: List[str] = [] status = self._status.copy() # copy implicitly closes open counters for result in results: - if self._report_to_fuzzmanager: - reporter = FuzzManagerReporter(self._report_tool) - else: - report_dir = "reports" if result.expected else "other_reports" - reporter = FilesystemReporter( - self._log_path / report_dir, major_bucket=False - ) # write reduction stats for expected results if result.expected: (Path(result.report.path) / "reduce_stats.txt").write_text( ReductionStatusReporter([status]).summary() ) if self._report_to_fuzzmanager: + reporter = FuzzManagerReporter(self._report_tool) status.add_to_reporter(reporter, expected=result.expected) - result = reporter.submit(testcases, result.report, force=result.expected) - if result is not None: + else: + report_dir = "reports" if result.expected else "other_reports" + reporter = FilesystemReporter( + self._log_path / report_dir, major_bucket=False + ) + submitted = reporter.submit(testcases, result.report, force=result.expected) + if submitted is not None: if self._report_to_fuzzmanager: - new_reports.append(result) + new_reports.append(str(submitted)) else: - new_reports.append(str(result.resolve())) + new_reports.append(str(submitted.resolve())) # only write new reports if not empty, otherwise previous reports may be # overwritten with an empty list if later reports are ignored if update_status and new_reports: self._status.last_reports = new_reports @classmethod - def main(cls, args): + def main(cls, args: Namespace) -> int: """CLI for `grizzly.reduce`. Arguments: - args (argparse.Namespace): Result from `ReduceArgs.parse_args`. + args: Result from `ReduceArgs.parse_args`. Returns: - int: 0 for success. non-0 indicates a problem. + 0 for success. non-0 indicates a problem. """ # pylint: disable=too-many-return-statements configure_logging(args.log_level) @@ -771,12 +771,12 @@ def main(cls, args): elif args.valgrind: LOG.info("Running with Valgrind. This will be SLOW!") - asset_mgr = None + asset_mgr: Optional[AssetManager] = None certs = None signature = None signature_desc = None - target = None - testcases = [] + target: Optional[Target] = None + testcases: List[TestCase] = [] try: try: @@ -821,6 +821,7 @@ def main(cls, args): rr=args.rr, valgrind=args.valgrind, ) + assert target if env_vars is not None: LOG.debug("adding environment loaded from test case") target.merge_environment(env_vars) @@ -843,7 +844,7 @@ def main(cls, args): with Sapphire(auto_close=1, timeout=timeout, certs=certs) as server: target.reverse(server.port, server.port) with ReduceManager( - args.ignore, + set(args.ignore), server, target, testcases, diff --git a/grizzly/reduce/crash.py b/grizzly/reduce/crash.py index 039515f2..e8b4c2d0 100644 --- a/grizzly/reduce/crash.py +++ b/grizzly/reduce/crash.py @@ -6,8 +6,7 @@ from ..common.fuzzmanager import load_fm_data from ..common.reporter import Quality -from ..common.utils import Exit -from ..main import configure_logging +from ..common.utils import Exit, configure_logging from ..replay.crash import modify_args from .args import ReduceFuzzManagerIDArgs from .core import ReduceManager @@ -45,7 +44,7 @@ def main(args: Namespace) -> int: Exit.ABORT: Quality(crash.testcase_quality), Exit.SUCCESS: Quality.ORIGINAL, Exit.FAILURE: Quality(args.no_repro_quality), - }.get(result, Quality.UNREDUCED) + }.get(Exit(result), Quality.UNREDUCED) # don't ever set things back to REDUCING, default to UNREDUCED in that case. # REDUCING is only used in automation, so ABORT should never happen. if quality == Quality.REDUCING: diff --git a/grizzly/reduce/exceptions.py b/grizzly/reduce/exceptions.py index ff9bff98..62f442db 100644 --- a/grizzly/reduce/exceptions.py +++ b/grizzly/reduce/exceptions.py @@ -8,7 +8,7 @@ class GrizzlyReduceBaseException(Exception): """Base for other Grizzly Reducer specific exceptions.""" - def __init__(self, msg, code=Exit.ERROR): + def __init__(self, msg: str, code: Exit = Exit.ERROR) -> None: super().__init__() self.msg = msg self.code = code @@ -17,5 +17,5 @@ def __init__(self, msg, code=Exit.ERROR): class NotReproducible(GrizzlyReduceBaseException): """Crash was not observed when expected during reduction.""" - def __init__(self, msg): + def __init__(self, msg: str) -> None: super().__init__(msg, code=Exit.FAILURE) diff --git a/grizzly/reduce/strategies/__init__.py b/grizzly/reduce/strategies/__init__.py index 2ded59f7..a5512530 100644 --- a/grizzly/reduce/strategies/__init__.py +++ b/grizzly/reduce/strategies/__init__.py @@ -8,10 +8,8 @@ for cleaning up all testcases that are yielded. Constants: - DEFAULT_STRATEGIES (list(str)): List of strategy names run by default if none are - specified. - STRATEGIES (dict{str: Strategy}): Mapping of available strategy names to - implementing class. + DEFAULT_STRATEGIES: Strategy names run by default if unspecified. + STRATEGIES: Mapping of available strategy names to implementing class. """ from abc import ABC, abstractmethod from hashlib import sha512 @@ -19,10 +17,22 @@ from pathlib import Path from shutil import rmtree from tempfile import mkdtemp -from types import MappingProxyType +from typing import ( + Any, + Dict, + FrozenSet, + Generator, + Iterable, + List, + Set, + Tuple, + Type, + cast, +) from pkg_resources import iter_entry_points +from ...common.storage import TestCase from ...common.utils import grz_tmp LOG = getLogger(__name__) @@ -38,38 +48,6 @@ ) -def _load_strategies(): - """STRATEGIES is created at the end of this file. - - Returns: - mapping: A mapping of strategy names to strategy class. - """ - strategies = {} - for entry_point in iter_entry_points("grizzly_reduce_strategies"): - try: - strategy_cls = entry_point.load() - strategy_cls.sanity_check_cls_attrs() - assert strategy_cls.name == entry_point.name, ( - f"entry_point name mismatch, check setup.py and " - f"{strategy_cls.__name__.name}" - ) - except Exception as exc: # pylint: disable=broad-except - LOG.debug("error loading strategy type %s: %s", entry_point.name, exc) - continue - strategies[entry_point.name] = strategy_cls - for strategy in DEFAULT_STRATEGIES: - assert strategy in strategies, ( - f"Unknown entry in DEFAULT_STRATEGIES: {strategy} " - f"(STRATEGIES: [{','.join(strategies)}])" - ) - return MappingProxyType(strategies) - - -def _contains_dd(path): - data = path.read_bytes() - return b"DDBEGIN" in data and b"DDEND" in data - - class Strategy(ABC): """A strategy is a procedure for repeatedly running a testcase to find the smallest equivalent test. @@ -77,31 +55,29 @@ class Strategy(ABC): Implementers must define these class attributes: Class Attributes: - name (str): The strategy name. + name: The strategy name. """ - name = None + name: str - def __init__(self, testcases): + def __init__(self, testcases: List[TestCase]) -> None: """Initialize strategy instance. Arguments: - testcases (list(grizzly.common.storage.TestCase)): - List of testcases to reduce. The object does not take ownership of the - testcases. + testcases: Testcases to reduce. The object does not take ownership of the + testcases. """ - self._tried = set() # set of tuple(tuple(str(Path), SHA512)) + self._tried: Set[Tuple[Tuple[str, bytes], ...]] = set() self._testcase_root = Path(mkdtemp(prefix="tc_", dir=grz_tmp("reduce"))) self.dump_testcases(testcases) - def _calculate_testcase_hash(self): + def _calculate_testcase_hash(self) -> Tuple[Tuple[str, bytes], ...]: """Calculate hashes of all files in testcase root. Returns: - tuple(tuple(str, str)): A tuple of 2-tuples mapping str(Path) to SHA-512 of - each file in testcase root. + Mapping of file path to SHA-512 of each file in testcase root. """ - result = [] + result: List[Tuple[str, bytes]] = [] for path in self._testcase_root.glob("**/*"): if path.is_file(): tf_hash = sha512() @@ -109,49 +85,51 @@ def _calculate_testcase_hash(self): result.append( (str(path.relative_to(self._testcase_root)), tf_hash.digest()) ) - result = tuple(sorted(result)) + sorted_result = tuple(sorted(result)) if LOG.getEffectiveLevel() == DEBUG: print_hash = sha512() - print_hash.update(repr(result).encode("utf-8", errors="surrogateescape")) - in_tried = result in self._tried + print_hash.update( + repr(sorted_result).encode("utf-8", errors="surrogateescape") + ) + in_tried = sorted_result in self._tried LOG.debug( "Testcase hash: %s (%sin cache)", print_hash.hexdigest()[:32], "" if in_tried else "not ", ) - return result + return sorted_result - def update_tried(self, tried): + def update_tried(self, tried: Iterable[Tuple[Tuple[str, bytes], ...]]) -> None: """Update the list of tried testcase/hash sets. Testcases are hashed with SHA-512 and digested to bytes (`hashlib.sha512(testcase).digest()`) Arguments: - tried (iterable(tuple(tuple(str, str)))): Set of already tried testcase - hashes. + tried: Collection of already tried testcase hashes. Returns: None """ self._tried.update(frozenset(tried)) - def get_tried(self): + def get_tried(self) -> FrozenSet[Tuple[Tuple[str, bytes], ...]]: """Return the set of tried testcase hashes. Testcases are hashed with SHA-512 and digested to bytes (`hashlib.sha512(testcase).digest()`) Returns: - frozenset(tuple(tuple(str, str))): Testcase hashes. + Testcase hashes. """ return frozenset(self._tried) - def dump_testcases(self, testcases, recreate_tcroot=False): + def dump_testcases( + self, testcases: List[TestCase], recreate_tcroot: bool = False + ) -> None: """Dump a testcase list to the testcase root on disk. Arguments: - testcases (list(grizzly.common.storage.TestCase)): list of testcases to dump - recreate_tcroot (bool): if True, delete testcase root and recreate it before - dumping + testcases: Testcases to dump. + recreate_tcroot: if True, delete and recreate tcroot before dumping it. Returns: None @@ -164,22 +142,8 @@ def dump_testcases(self, testcases, recreate_tcroot=False): # NOTE: naming determines load order testcase.dump(self._testcase_root / f"{idx:03d}", include_details=True) - @classmethod - def sanity_check_cls_attrs(cls): - """Sanity check the strategy class implementation. - - This should assert that any required class attributes are defined and correct. - - Raises: - AssertionError: Any required class attributes are missing or wrong type. - - Returns: - None - """ - assert isinstance(cls.name, str) - @abstractmethod - def __iter__(self): + def __iter__(self) -> Generator[List[TestCase], None, None]: """Iterate over potential reductions of testcases according to this strategy. The caller should evaluate each reduction yielded, and call `update` with the @@ -187,43 +151,39 @@ def __iter__(self): each. Yields: - list(grizzly.common.storage.TestCase): list of testcases with reduction - applied + Testcases with reduction applied. """ @abstractmethod - def update(self, success): + def update(self, success: bool) -> None: """Inform the strategy whether or not the last reduction yielded was good. Arguments: - success (bool): Whether or not the last reduction was acceptable. + success: Whether or not the last reduction was acceptable. Returns: None """ - def __enter__(self): + def __enter__(self) -> "Strategy": """Enter a runtime context that will automatically call `cleanup` on exit. Returns: - Strategy: self + self """ return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, *exc: Any) -> None: """Exit the runtime context. `cleanup` is called. Arguments: - exc_type (type or None): Type of exception object currently raised. - exc_val (Exception or None): Exception object currently raised. - exc_tb (traceback or None): Traceback for currently raised exception. Returns: None """ self.cleanup() - def cleanup(self): + def cleanup(self) -> None: """Destroy all resources held by the strategy. Returns: @@ -232,4 +192,34 @@ def cleanup(self): rmtree(self._testcase_root) +def _load_strategies() -> Dict[str, Type[Strategy]]: + """STRATEGIES is created at the end of this file. + + Returns: + A mapping of strategy names to strategy class. + """ + strategies: Dict[str, Type[Strategy]] = {} + for entry_point in iter_entry_points("grizzly_reduce_strategies"): + try: + strategy_cls = cast(Type[Strategy], entry_point.load()) + assert ( + strategy_cls.name == entry_point.name + ), f"entry_point name mismatch, check setup.py and {strategy_cls.__name__}" + except Exception as exc: # pylint: disable=broad-except + LOG.debug("error loading strategy type %s: %s", entry_point.name, exc) + continue + strategies[entry_point.name] = strategy_cls + for strategy in DEFAULT_STRATEGIES: + assert strategy in strategies, ( + f"Unknown entry in DEFAULT_STRATEGIES: {strategy} " + f"(STRATEGIES: [{','.join(strategies)}])" + ) + return strategies + + +def _contains_dd(path: Path) -> bool: + data = path.read_bytes() + return b"DDBEGIN" in data and b"DDEND" in data + + STRATEGIES = _load_strategies() diff --git a/grizzly/reduce/strategies/beautify.py b/grizzly/reduce/strategies/beautify.py index 7aaae9c9..6172c103 100644 --- a/grizzly/reduce/strategies/beautify.py +++ b/grizzly/reduce/strategies/beautify.py @@ -10,6 +10,8 @@ import re from abc import ABC, abstractmethod from logging import getLogger +from pathlib import Path +from typing import Generator, List, Match, Optional, Set, Tuple, cast from lithium.testcases import TestcaseLine @@ -33,15 +35,15 @@ LOG = getLogger(__name__) -def _split_normal_lines(data): +def _split_normal_lines(data: bytes) -> Generator[bytes, None, None]: """Like str.splitlines but only respect \n, \r\n, and \r .. leave other potential line break characters intact. Arguments: - data (bytes): Input line to process. + data: Input line to process. - Returns: - generator[bytes]: Yield lines split from data, not including line endings. + Yields: + Lines split from data, not including line endings. """ for win_line in data.split(b"\r\n"): for mac_line in win_line.split(b"\r"): @@ -54,32 +56,31 @@ class _BeautifyStrategy(Strategy, ABC): Implementers must define these class attributes: Class attributes: - all_extensions (set(str)): Set of all file extensions to beautify. - import_available (bool): Whether or not the beautify module was imported. - import_name (str): The name of the beautify module imported (for error - reporting). - name (str): The strategy name. - native_extension (str): The native file extension for this type. - tag_name (str): Tag name to search for in other (non-native) extensions. + all_extensions: Set of all file extensions to beautify. + import_available: Whether or not the beautify module was imported. + import_name: The name of the beautify module imported (for error reporting). + name: The strategy name. + native_extension: The native file extension for this type. + tag_name: Tag name to search for in other (non-native) extensions. """ - all_extensions = None + all_extensions: Set[str] ignore_files = {TEST_INFO, "prefs.js"} - import_available = None - import_name = None - native_extension = None - tag_name = None + import_available: bool + import_name: str + native_extension: str + tag_name: str - def __init__(self, testcases): + def __init__(self, testcases: List[TestCase]) -> None: """Initialize beautification strategy instance. Arguments: - testcases (list(grizzly.common.storage.TestCase)): - List of testcases to reduce. The object does not take ownership of the - testcases. + testcases: Testcases to reduce. The object does not take ownership of the + testcases. """ + assert self.tag_name is not None super().__init__(testcases) - self._files_to_beautify = [] + self._files_to_beautify: List[Path] = [] for path in self._testcase_root.glob("**/*"): if ( path.is_file() @@ -88,7 +89,7 @@ def __init__(self, testcases): ): if _contains_dd(path): self._files_to_beautify.append(path) - self._current_feedback = None + self._current_feedback: Optional[bool] = None tag_bytes = self.tag_name.encode("ascii") self._re_tag_start = re.compile( rb"<\s*" + tag_bytes + rb".*?>", flags=re.DOTALL | re.IGNORECASE @@ -97,29 +98,11 @@ def __init__(self, testcases): rb"", flags=re.IGNORECASE ) - @classmethod - def sanity_check_cls_attrs(cls): - """Sanity check the strategy class implementation. - - Raises: - AssertionError: Required class attributes are missing or wrong type. - - Returns: - None - """ - super().sanity_check_cls_attrs() - assert isinstance(cls.all_extensions, set) - assert all(isinstance(ext, str) for ext in cls.all_extensions) - assert isinstance(cls.import_available, bool) - assert isinstance(cls.import_name, str) - assert isinstance(cls.native_extension, str) - assert isinstance(cls.tag_name, str) - - def update(self, success): + def update(self, success: bool) -> None: """Inform the strategy whether or not the last beautification yielded was good. Arguments: - success (bool): Whether or not the last beautification was acceptable. + success: Whether or not the last beautification was acceptable. Returns: None @@ -129,27 +112,29 @@ def update(self, success): @classmethod @abstractmethod - def beautify_bytes(cls, data): + def beautify_bytes(cls, data: bytes) -> bytes: """Perform beautification on a code buffer. Arguments: - data (bytes): The code data to be beautified. + data: The data to be beautified. Returns: - bytes: The beautified result. + The beautified result. """ - def _chunks_to_beautify(self, before, to_beautify, file): + def _chunks_to_beautify( + self, before: bytes, to_beautify: bytes, file: Path + ) -> Generator[Tuple[int, int], None, None]: """Iterate over `to_beautify` and find chunks of style/script to beautify. Arguments: - before (bytes): The data preceding `to_beautify`. Used to check whether - `to_beautify` is already in an open