-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
398 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
# This Source Code Form is subject to the terms of the Mozilla Public | ||
# License, v. 2.0. If a copy of the MPL was not distributed with this | ||
# file, You can obtain one at http://mozilla.org/MPL/2.0/. | ||
import binascii | ||
from base64 import b64decode | ||
from logging import getLogger | ||
from os import environ | ||
from pathlib import Path | ||
from shutil import rmtree | ||
from tempfile import mkdtemp | ||
from zipfile import ZipFile | ||
|
||
from bugsy import Bugsy | ||
from bugsy.errors import BugsyException | ||
from requests.exceptions import ConnectionError as RequestsConnectionError | ||
|
||
from .utils import grz_tmp | ||
|
||
# attachments that can be ignored | ||
IGNORE_EXTS = frozenset({"c", "cpp", "diff", "exe", "log", "patch", "php", "py", "txt"}) | ||
KNOWN_ASSETS = {"prefs": "prefs.js"} | ||
LOG = getLogger(__name__) | ||
|
||
|
||
class BugzillaBug: | ||
__slots__ = ("_bug", "_data") | ||
|
||
def __init__(self, bug): | ||
self._bug = bug | ||
self._data = Path(mkdtemp(prefix=f"bug{bug.id}-", dir=grz_tmp("bugzilla"))) | ||
self._fetch_attachments() | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, *exc): | ||
self.cleanup() | ||
|
||
def _fetch_attachments(self) -> None: | ||
"""Download bug attachments. | ||
Arguments: | ||
None | ||
Returns: | ||
None | ||
""" | ||
for attachment in self._bug.get_attachments(): | ||
if ( | ||
attachment.is_obsolete | ||
or attachment.content_type == "text/x-phabricator-request" | ||
or attachment.file_name.split(".")[-1] in IGNORE_EXTS | ||
): | ||
continue | ||
try: | ||
data = b64decode(attachment.data) | ||
except binascii.Error as exc: | ||
LOG.warning( | ||
"Failed to decode attachment: %r (%s)", attachment.file_name, exc | ||
) | ||
continue | ||
(self._data / attachment.file_name).write_bytes(data) | ||
|
||
def _unpack_archives(self): | ||
"""Unpack and remove archives. | ||
Arguments: | ||
None | ||
Returns: | ||
None | ||
""" | ||
for num, entry in enumerate(self._data.iterdir()): | ||
if entry.suffix.lower() == ".zip" and entry.is_file(): | ||
dst = self._data / f"unpacked_{num:02d}_{entry.stem}" | ||
LOG.debug("unpacking %s to '%s'", entry, dst) | ||
with ZipFile(entry) as zip_fp: | ||
zip_fp.extractall(path=dst) | ||
entry.unlink() | ||
# TODO: add support for other archive types | ||
|
||
def assets(self, ignore=None): | ||
"""Scan files for assets. | ||
Arguments: | ||
ignore (list(str)): Assets not to include in output. | ||
Yields: | ||
tuple(str, Path): Name and path to asset. | ||
""" | ||
for asset, file in KNOWN_ASSETS.items(): | ||
if not ignore or asset not in ignore: | ||
asset_path = self._data / file | ||
if asset_path.is_file(): | ||
yield asset, asset_path | ||
|
||
def cleanup(self): | ||
"""Remove attachment data. | ||
Arguments: | ||
None | ||
Returns: | ||
None | ||
""" | ||
rmtree(self._data) | ||
|
||
@classmethod | ||
def load(cls, bug_id): | ||
"""Load bug information from a Bugzilla instance. | ||
Arguments: | ||
bug_id (int): Bug to load. | ||
Returns: | ||
BugzillaBug | ||
""" | ||
api_key = environ.get("BZ_API_KEY") | ||
# default root matches Bugsy | ||
api_root = environ.get("BZ_API_ROOT", "https://bugzilla.mozilla.org/rest") | ||
bugzilla = Bugsy(api_key=api_key, bugzilla_url=api_root) | ||
try: | ||
return cls(bugzilla.get(bug_id)) | ||
except BugsyException as exc: | ||
LOG.error("%s", exc.msg) | ||
except RequestsConnectionError as exc: | ||
LOG.error("Unable to connect to %r (%s)", bugzilla.bugzilla_url, exc) | ||
return None | ||
|
||
def testcases(self): | ||
"""Create a list of potential test cases. | ||
Arguments: | ||
None | ||
Returns: | ||
list(Path): Files and directories that could potentially be test cases. | ||
""" | ||
# unpack archives | ||
self._unpack_archives() | ||
testcases = list(x for x in self._data.iterdir() if x.is_dir()) | ||
# scan base directory for files, filtering out assets | ||
files = tuple( | ||
x | ||
for x in self._data.iterdir() | ||
if x.is_file() and x.name.lower() not in KNOWN_ASSETS.values() | ||
) | ||
# first, if base directory contains multiple files add it as a single test case | ||
if len(files) > 1: | ||
testcases.append(self._data) | ||
# finally, add each individual file as a potential test case | ||
testcases.extend(files) | ||
return testcases |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
# This Source Code Form is subject to the terms of the Mozilla Public | ||
# License, v. 2.0. If a copy of the MPL was not distributed with this | ||
# file, You can obtain one at http://mozilla.org/MPL/2.0/. | ||
# pylint: disable=protected-access | ||
from base64 import b64encode | ||
from zipfile import ZipFile | ||
|
||
from bugsy import Attachment, Bug, BugsyException | ||
from pytest import mark | ||
from requests.exceptions import ConnectionError as RequestsConnectionError | ||
|
||
from .bugzilla import BugzillaBug | ||
|
||
|
||
def test_bugzilla_01(mocker): | ||
"""test BugzillaBug._fetch_attachments()""" | ||
bug = mocker.Mock(spec=Bug, id=123) | ||
bug.get_attachments.return_value = ( | ||
# ignored obsolete | ||
mocker.Mock(spec=Attachment, is_obsolete=True), | ||
# ignored content type | ||
mocker.Mock( | ||
spec=Attachment, | ||
is_obsolete=False, | ||
content_type="text/x-phabricator-request", | ||
), | ||
# ignored file extension | ||
mocker.Mock( | ||
spec=Attachment, | ||
is_obsolete=False, | ||
content_type="text/plain", | ||
file_name="ignore.txt", | ||
), | ||
# valid test case | ||
mocker.Mock( | ||
spec=Attachment, | ||
is_obsolete=False, | ||
content_type="text/html", | ||
file_name="test.html", | ||
data=b64encode(b"foo"), | ||
), | ||
# corrupted data | ||
mocker.Mock( | ||
spec=Attachment, | ||
is_obsolete=False, | ||
content_type="text/html", | ||
file_name="broken.html", | ||
data=b"bad-b64", | ||
), | ||
) | ||
with BugzillaBug(bug) as bz_bug: | ||
assert len(tuple(bz_bug._data.iterdir())) == 1 | ||
assert (bz_bug._data / "test.html").is_file() | ||
assert (bz_bug._data / "test.html").read_text() == "foo" | ||
|
||
|
||
@mark.parametrize( | ||
"exc", | ||
[ | ||
BugsyException("foo", error_code=101), | ||
RequestsConnectionError(), | ||
], | ||
) | ||
def test_bugzilla_02(mocker, exc): | ||
"""test BugzillaBug.load() - errors""" | ||
bugsy = mocker.patch("grizzly.common.bugzilla.Bugsy", autospec=True) | ||
bugsy.return_value.get.side_effect = exc | ||
bugsy.return_value.bugzilla_url = "foo" | ||
assert BugzillaBug.load(123) is None | ||
|
||
|
||
def test_bugzilla_03(mocker): | ||
"""test BugzillaBug.load()""" | ||
bugsy = mocker.patch("grizzly.common.bugzilla.Bugsy", autospec=True) | ||
bugsy.return_value.get.return_value = mocker.MagicMock(spec=Bug, id=123) | ||
with BugzillaBug.load(123): | ||
pass | ||
|
||
|
||
def test_bugzilla_04(mocker): | ||
"""test BugzillaBug.assets()""" | ||
bug = mocker.Mock(spec=Bug, id=123) | ||
bug.get_attachments.return_value = [] | ||
with BugzillaBug(bug) as bz_bug: | ||
(bz_bug._data / "prefs.js").touch() | ||
(bz_bug._data / "test.html").touch() | ||
# load prefs.js asset | ||
results = tuple(bz_bug.assets()) | ||
assert len(results) == 1 | ||
assert results[0] == ("prefs", bz_bug._data / "prefs.js") | ||
# load ignore prefs.js asset | ||
assert not any(bz_bug.assets(ignore=["prefs"])) | ||
|
||
|
||
@mark.parametrize("archive_count", [0, 1, 2]) | ||
def test_bugzilla_05(mocker, tmp_path, archive_count): | ||
"""test BugzillaBug._unpack_archive()""" | ||
(tmp_path / "test.html").write_text("foo") | ||
bug = mocker.Mock(spec=Bug, id=123) | ||
bug.get_attachments.return_value = [] | ||
with BugzillaBug(bug) as bz_bug: | ||
(bz_bug._data / "not_archive.txt").touch() | ||
for num in range(archive_count): | ||
with ZipFile(bz_bug._data / f"archive{num:02d}.zip", "w") as zfp: | ||
zfp.write(tmp_path / "test.html", arcname="test.html") | ||
bz_bug._unpack_archives() | ||
results = tuple(x for x in bz_bug._data.iterdir() if x.is_dir()) | ||
assert len(results) == archive_count | ||
for num in range(archive_count): | ||
assert not (bz_bug._data / f"archive{num:02d}.zip").is_file() | ||
for path in results: | ||
assert (path / "test.html").is_file() | ||
|
||
|
||
@mark.parametrize( | ||
"files, count", | ||
[ | ||
# no files | ||
((), 0), | ||
# single test case | ||
(("test.html",), 1), | ||
# multiple test cases (add the path and both files individually) | ||
(("a.html", "b.html"), 3), | ||
# test case and asset | ||
(("prefs.js", "test.html"), 1), | ||
# test loading archive | ||
(("archive.zip",), 1), | ||
# test loading archive and standalone test | ||
(("archive.zip", "test.html"), 2), | ||
], | ||
) | ||
def test_bugzilla_06(mocker, tmp_path, files, count): | ||
"""test BugzillaBug.testcases()""" | ||
(tmp_path / "test.html").write_text("foo") | ||
bug = mocker.Mock(spec=Bug, id=123) | ||
bug.get_attachments.return_value = [] | ||
with BugzillaBug(bug) as bz_bug: | ||
for file in files: | ||
if file.endswith("zip"): | ||
with ZipFile(bz_bug._data / file, "w") as zfp: | ||
zfp.write(tmp_path / "test.html", arcname="test.html") | ||
else: | ||
(bz_bug._data / file).touch() | ||
assert len(bz_bug.testcases()) == count |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
# This Source Code Form is subject to the terms of the Mozilla Public | ||
# License, v. 2.0. If a copy of the MPL was not distributed with this | ||
# file, You can obtain one at http://mozilla.org/MPL/2.0/. | ||
from logging import getLogger | ||
|
||
from ..common.bugzilla import BugzillaBug | ||
from ..main import configure_logging | ||
from .args import ReplayFuzzBugzillaArgs | ||
from .replay import ReplayManager | ||
|
||
LOG = getLogger(__name__) | ||
|
||
|
||
def main(args): | ||
"""CLI for `grizzly.replay.bugzilla`. | ||
Arguments: | ||
args (argparse.Namespace): Result from `ReplayArgs.parse_args`. | ||
Returns: | ||
int: 0 for success. non-0 indicates a problem. | ||
""" | ||
configure_logging(args.log_level) | ||
bug = BugzillaBug.load(args.input) | ||
if bug is None: | ||
LOG.info("Failed to load Bug %d from Bugzilla", args.input) | ||
return 1 | ||
LOG.info("Loaded Bug %d from Bugzilla", args.input) | ||
with bug: | ||
args.asset.extend( | ||
# favor assets provided via the command line over the bug attachments | ||
bug.assets(ignore=tuple(x[0] for x in args.asset)) | ||
) | ||
testcases = bug.testcases() | ||
if not testcases: | ||
LOG.error("No test case data attached to bug %d", args.input) | ||
return 1 | ||
args.input = testcases | ||
return ReplayManager.main(args) | ||
|
||
|
||
if __name__ == "__main__": | ||
raise SystemExit(main(ReplayFuzzBugzillaArgs().parse_args())) |
Oops, something went wrong.