Skip to content

Commit

Permalink
Add type hinting to fuzzing session code
Browse files Browse the repository at this point in the history
  • Loading branch information
tysmith committed May 9, 2024
1 parent b20c148 commit 00a85e1
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 87 deletions.
17 changes: 9 additions & 8 deletions grizzly/args.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from argparse import ArgumentParser, HelpFormatter
from argparse import ArgumentParser, HelpFormatter, Namespace
from logging import CRITICAL, DEBUG, ERROR, INFO, WARNING
from os import getenv
from os.path import exists
from pathlib import Path
from platform import system
from typing import List, Optional

from .common.fuzzmanager import FM_CONFIG, ProgramConfiguration
from .common.plugins import scan as scan_plugins
Expand Down Expand Up @@ -36,7 +37,7 @@ class CommonArgs:
IGNORABLE = ("log-limit", "memory", "timeout")
DEFAULT_IGNORE = ("log-limit", "timeout")

def __init__(self):
def __init__(self) -> None:
# log levels for console logging
self._level_map = {
"CRIT": CRITICAL,
Expand Down Expand Up @@ -64,7 +65,7 @@ def __init__(self):

# build 'asset' help string
assets = scan_target_assets()
asset_msg = []
asset_msg: List[str] = []
for target in sorted(assets):
if assets[target]:
asset_msg.append(f"{target}: {', '.join(sorted(assets[target]))}.")
Expand Down Expand Up @@ -248,7 +249,7 @@ def __init__(self):
)

@staticmethod
def is_headless():
def is_headless() -> bool:
if (
system().startswith("Linux")
and not getenv("DISPLAY")
Expand All @@ -257,12 +258,12 @@ def is_headless():
return True
return False

def parse_args(self, argv=None):
def parse_args(self, argv: Optional[List[str]] = None) -> Namespace:
args = self.parser.parse_args(argv)
self.sanity_check(args)
return args

def sanity_check(self, args):
def sanity_check(self, args: Namespace) -> None:
if not args.binary.is_file():
self.parser.error(f"file not found: '{args.binary!s}'")

Expand Down Expand Up @@ -352,7 +353,7 @@ def sanity_check(self, args):


class GrizzlyArgs(CommonArgs):
def __init__(self):
def __init__(self) -> None:
super().__init__()

adapters = scan_plugins("grizzly_adapters")
Expand Down Expand Up @@ -426,7 +427,7 @@ def __init__(self):
" (default: %(default)s) - Use 0 for 'no limit'",
)

def sanity_check(self, args):
def sanity_check(self, args: Namespace) -> None:
super().sanity_check(args)

if args.collect < 1:
Expand Down
24 changes: 13 additions & 11 deletions grizzly/common/iomanager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Any, List, Optional

from sapphire.server_map import ServerMap

from .storage import TestCase
Expand All @@ -19,34 +21,34 @@ class IOManager:
"tests",
)

def __init__(self, report_size=1):
def __init__(self, report_size: int = 1) -> None:
assert report_size > 0
self.server_map = ServerMap()
# tests will be ordered oldest to newest
self.tests = []
self.tests: List[TestCase] = []
# total number of test cases generated
self._generated = 0
self._report_size = report_size
self._test = None
self._test: Optional[TestCase] = None

def __enter__(self):
def __enter__(self) -> "IOManager":
return self

def __exit__(self, *exc):
def __exit__(self, *exc: Any) -> None:
self.cleanup()

def cleanup(self):
def cleanup(self) -> None:
self.purge()

def commit(self):
def commit(self) -> None:
assert self._test is not None
self.tests.append(self._test)
self._test = None
# manage testcase cache size
if len(self.tests) > self._report_size:
self.tests.pop(0).cleanup()

def create_testcase(self, adapter_name):
def create_testcase(self, adapter_name: str) -> TestCase:
assert self._test is None
self._test = TestCase(self.page_name(), adapter_name)
# reset redirect map
Expand All @@ -57,10 +59,10 @@ def create_testcase(self, adapter_name):
self._generated += 1
return self._test

def page_name(self, offset=0):
return f"test_{self._generated + offset:0>4d}.html"
def page_name(self, offset: int = 0) -> str:
return f"test_{self._generated + offset:04d}.html"

def purge(self):
def purge(self) -> None:
if self._test is not None:
self._test.cleanup()
self._test = None
Expand Down
59 changes: 35 additions & 24 deletions grizzly/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from argparse import Namespace
from logging import DEBUG, getLogger
from os import getpid
from typing import Optional, cast

from sapphire import Sapphire

Expand All @@ -12,6 +14,7 @@
FailedLaunchReporter,
FilesystemReporter,
FuzzManagerReporter,
Reporter,
)
from .common.utils import (
CertificateBundle,
Expand All @@ -20,7 +23,7 @@
display_time_limits,
time_limits,
)
from .session import Session
from .session import LogRate, Session
from .target import Target, TargetLaunchError, TargetLaunchTimeout

__author__ = "Tyson Smith"
Expand All @@ -30,7 +33,7 @@
LOG = getLogger(__name__)


def main(args):
def main(args: Namespace) -> Exit:
configure_logging(args.log_level)
LOG.info("Starting Grizzly (%d)", getpid())

Expand All @@ -45,13 +48,16 @@ def main(args):
elif args.valgrind:
LOG.info("Running with Valgrind. This will be SLOW!")

adapter = None
certs = None
adapter: Optional[Adapter] = None
certs: Optional[CertificateBundle] = None
complete_with_results = False
target = None
target: Optional[Target] = None
try:
LOG.debug("initializing Adapter %r", args.adapter)
adapter = load_plugin(args.adapter, "grizzly_adapters", Adapter)(args.adapter)
adapter = cast(
Adapter,
load_plugin(args.adapter, "grizzly_adapters", Adapter)(args.adapter),
)

# calculate time limit and timeout
time_limit, timeout = time_limits(
Expand All @@ -61,24 +67,27 @@ def main(args):

if adapter.RELAUNCH > 0:
LOG.info("Relaunch (%d) set in Adapter", adapter.RELAUNCH)
relaunch = adapter.RELAUNCH
relaunch: int = adapter.RELAUNCH
else:
relaunch = args.relaunch

if not args.use_http:
certs = CertificateBundle.create()

LOG.debug("initializing the Target %r", args.platform)
target = load_plugin(args.platform, "grizzly_targets", Target)(
args.binary,
args.launch_timeout,
args.log_limit,
args.memory,
certs=certs,
headless=args.headless,
pernosco=args.pernosco,
rr=args.rr,
valgrind=args.valgrind,
target = cast(
Target,
load_plugin(args.platform, "grizzly_targets", Target)(
args.binary,
args.launch_timeout,
args.log_limit,
args.memory,
certs=certs,
headless=args.headless,
pernosco=args.pernosco,
rr=args.rr,
valgrind=args.valgrind,
),
)
# add specified assets
target.asset_mgr.add_batch(args.asset)
Expand All @@ -92,11 +101,13 @@ def main(args):

LOG.debug("initializing the Reporter")
if args.fuzzmanager:
reporter = FuzzManagerReporter(args.tool or f"grizzly-{adapter.name}")
LOG.info("Results will be reported via FuzzManager (%s)", reporter.tool)
tool = args.tool or f"grizzly-{adapter.name}"
reporter: Reporter = FuzzManagerReporter(tool)
LOG.info("Results will be reported via FuzzManager (%s)", tool)
else:
reporter = FilesystemReporter(args.output / "results")
LOG.info("Results will be stored in '%s'", reporter.report_path)
report_path = args.output / "results"
reporter = FilesystemReporter(report_path)
LOG.info("Results will be stored in '%s'", report_path.resolve())
reporter.display_logs = args.smoke_test or reporter.display_logs

if args.limit:
Expand All @@ -123,9 +134,9 @@ def main(args):
report_size=args.collect,
) as session:
if args.log_level == DEBUG or args.verbose:
display_mode = Session.DISPLAY_VERBOSE
log_rate = LogRate.VERBOSE
else:
display_mode = Session.DISPLAY_NORMAL
log_rate = LogRate.NORMAL
session.run(
args.ignore,
time_limit,
Expand All @@ -134,7 +145,7 @@ def main(args):
no_harness=args.no_harness,
result_limit=1 if args.smoke_test else 0,
runtime_limit=args.runtime,
display_mode=display_mode,
log_rate=log_rate,
launch_attempts=args.launch_attempts,
post_launch_delay=args.post_launch_delay,
)
Expand Down
Loading

0 comments on commit 00a85e1

Please sign in to comment.