Skip to content

Commit

Permalink
Add type hinting to utils.py
Browse files Browse the repository at this point in the history
  • Loading branch information
tysmith committed Mar 25, 2024
1 parent ed957d1 commit 10d181d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 29 deletions.
2 changes: 1 addition & 1 deletion grizzly/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def __init__(self):
"--version",
"-V",
action="version",
version=__version__ if __version__ else "Unknown - Package not installed.",
version=__version__,
help="Show version number",
)
if system().startswith("Linux"):
Expand Down
65 changes: 37 additions & 28 deletions grizzly/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pathlib import Path
from shutil import rmtree
from tempfile import gettempdir
from typing import Any, Dict, Iterable, Optional, Tuple, Union

from cryptography import x509
from cryptography.hazmat.backends import default_backend
Expand Down Expand Up @@ -37,7 +38,7 @@
__version__ = version("grizzly-framework")
except PackageNotFoundError: # pragma: no cover
# package is not installed
__version__ = None
__version__ = "unknown"

DEFAULT_TIME_LIMIT = 30
GRZ_TMP = Path(getenv("GRZ_TMP", gettempdir()), "grizzly")
Expand All @@ -50,14 +51,14 @@
class CertificateBundle:
"""Contains root CA, host CA and private key files."""

def __init__(self, path, root, host, key):
def __init__(self, path: Path, root: Path, host: Path, key: Path) -> None:
self._base = path
self.root = root
self.host = host
self.key = key

@classmethod
def create(cls, path=None):
def create(cls, path: Optional[Path] = None) -> "CertificateBundle":
"""Create certificate files.
Args:
Expand All @@ -71,7 +72,7 @@ def create(cls, path=None):
certs = generate_certificates(path)
return cls(path, certs["root"], certs["host"], certs["key"])

def cleanup(self):
def cleanup(self) -> None:
"""Remove certificate files.
Args:
Expand All @@ -87,7 +88,7 @@ def cleanup(self):
class ConfigError(Exception):
"""Raised to indicate invalid configuration a state"""

def __init__(self, message, exit_code):
def __init__(self, message: str, exit_code: int) -> None:
super().__init__(message)
self.exit_code = exit_code

Expand All @@ -109,11 +110,11 @@ class Exit(IntEnum):
FAILURE = 5


def configure_logging(log_level):
def configure_logging(log_level: int) -> None:
"""Configure log output level and formatting.
Args:
log_level (int): Set log level.
log_level: Set log level.
Returns:
None
Expand All @@ -130,13 +131,13 @@ def configure_logging(log_level):
basicConfig(format=log_fmt, datefmt=date_fmt, level=log_level)


def display_time_limits(time_limit, timeout, no_harness):
def display_time_limits(time_limit: int, timeout: int, no_harness: bool) -> None:
"""Output configuration of time limits and harness.
Args:
time_limit (int): Time in seconds before harness attempts to close current test.
timeout (int): Time in seconds before iteration is considered a timeout.
no_harness (bool): Indicate whether harness will is disabled.
time_limit: Time in seconds before harness attempts to close current test.
timeout: Time in seconds before iteration is considered a timeout.
no_harness: Indicate whether harness will is disabled.
Returns:
None
Expand All @@ -156,13 +157,21 @@ def display_time_limits(time_limit, timeout, no_harness):
LOG.warning("TIMEOUT DISABLED, not recommended for automation")


def grz_tmp(*subdir):
def grz_tmp(*subdir: Union[str, Path]) -> Path:
"""Create (if needed) a temporary working directory in a known location.
Args:
subdir: Nested directories.
Returns:
Path within the temporary working directory.
"""
path = Path(GRZ_TMP, *subdir)
path.mkdir(parents=True, exist_ok=True)
return path


def generate_certificates(cert_dir: Path):
def generate_certificates(cert_dir: Path) -> Dict[str, Path]:
"""Generate a root CA and host certificate.
Taken from https://stackoverflow.com/a/56292132
Expand Down Expand Up @@ -229,24 +238,25 @@ def generate_certificates(cert_dir: Path):


def time_limits(
time_limit,
timeout,
tests=None,
default_limit=DEFAULT_TIME_LIMIT,
timeout_delay=TIMEOUT_DELAY,
):
time_limit: Optional[int],
timeout: Optional[int],
# NOTE: Any should be TestCase, this function should likely live somewhere else
tests: Optional[Iterable[Any]] = None,
default_limit: int = DEFAULT_TIME_LIMIT,
timeout_delay: int = TIMEOUT_DELAY,
) -> Tuple[int, int]:
"""Determine the test time limit and timeout. If time_limit or timeout is None
it is calculated otherwise the provided value is used.
Args:
time_limit (int): Test time limit.
timeout (int): Iteration timeout.
tests (iterable): Testcases that may contain time limit values.
default_limit (int): Value to use as default time limit.
timeout_delay (int): Value to use as delay when calculating timeout.
time_limit: Test time limit.
timeout: Iteration timeout.
tests: Testcases that may contain time limit values.
default_limit: Value to use as default time limit.
timeout_delay: Value to use as delay when calculating timeout.
Returns:
tuple (int, int): Time limit and timeout.
Time limit and timeout.
"""
assert default_limit > 0
assert timeout_delay >= 0
Expand All @@ -259,10 +269,9 @@ def time_limits(
# add small time buffer to duration
test_limits.extend(int(x.duration) + 10 for x in tests if x.duration)
time_limit = max(test_limits)
assert time_limit > 0
assert time_limit is not None and time_limit > 0
# calculate timeout
calc_timeout = timeout is None
if calc_timeout:
if timeout is None:
timeout = time_limit + timeout_delay
elif calc_limit and time_limit > timeout > 0:
LOG.debug("calculated time limit > given timeout, using timeout")
Expand Down

0 comments on commit 10d181d

Please sign in to comment.