diff --git a/README.rst b/README.rst index c2a0ddf..daa34a5 100644 --- a/README.rst +++ b/README.rst @@ -95,8 +95,11 @@ Example of custom credentials for the plugin: 'username': 'myusername', 'password': 'mypassword', 'stream': 'some_coverity_stream', + 'snapshot': '1', } +Snapshot is optional. When an empty string is given, the last snapshot is used. + Link to traceability items ========================== diff --git a/example/Makefile b/example/Makefile index 6cc49ad..e7cf00e 100644 --- a/example/Makefile +++ b/example/Makefile @@ -10,7 +10,6 @@ BUILDDIR ?= _build # logging variables DEBUG ?= 0 -LOGLEVEL ?= WARNING # Internal variables. PAPEROPT_a4 = -D latex_paper_size=a4 @@ -50,7 +49,6 @@ clean: -rm -rf $(BUILDDIR)/* html: - export LOGLEVEL=$(LOGLEVEL) $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html @echo @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." diff --git a/example/conf.py b/example/conf.py index 7530203..6c2253b 100644 --- a/example/conf.py +++ b/example/conf.py @@ -15,10 +15,9 @@ import sys import mlx.coverity -from mlx.coverity import __version__, coverity_logging +from mlx.coverity import __version__ import mlx.traceability from decouple import config -import logging pkg_version = __version__ @@ -320,11 +319,3 @@ TRACEABILITY_ITEM_ID_REGEX = r"([A-Z_]+-[A-Z0-9_]+)" TRACEABILITY_ITEM_RELINK = {} -log_level = os.environ.get('LOGLEVEL', None) -if log_level: - try: - numeric_level = getattr(logging, log_level.upper(), None) - coverity_logging.LOGGER.setLevel(level=numeric_level) - except: - raise ValueError(f"Invalid log level: {log_level}") - diff --git a/mlx/coverity/__init__.py b/mlx/coverity/__init__.py index d9fa3e3..caf5b07 100644 --- a/mlx/coverity/__init__.py +++ b/mlx/coverity/__init__.py @@ -5,13 +5,10 @@ "CoverityDefectListDirective", "CoverityDefectService", "ItemElement", - "report_info", - "report_warning", "SphinxCoverityConnector", ] from .__coverity_version__ import __version__ -from .coverity_logging import report_info, report_warning from .coverity import SphinxCoverityConnector from .coverity_services import CoverityDefectService from .coverity_item_element import ItemElement diff --git a/mlx/coverity/coverity.py b/mlx/coverity/coverity.py index 1796c0e..13fd112 100644 --- a/mlx/coverity/coverity.py +++ b/mlx/coverity/coverity.py @@ -9,17 +9,29 @@ from getpass import getpass from urllib.error import URLError, HTTPError +from sphinx.util.logging import getLogger, VERBOSITY_MAP from docutils import nodes from .__coverity_version__ import __version__ -from .coverity_logging import report_info, report_warning from .coverity_services import CoverityDefectService from .coverity_directives.coverity_defect_list import ( CoverityDefect, CoverityDefectListDirective, ) +LOGGER = getLogger("mlx.coverity") + + +def validate_coverity_credentials(config): + """Validate the configuration of coverity_credentials. + + Args: + config (dict): The configuration `coverity_credentials`. + """ + if missing := {"hostname", "username", "password", "stream"}.difference(config): + LOGGER.error(f"Missing mandatory keys from configuration variable 'coverity_credentials' in conf.py: {missing}") + class SphinxCoverityConnector: """ @@ -45,42 +57,43 @@ def initialize_environment(self, app): \\let\@noitemerr\\relax \\makeatother""" + validate_coverity_credentials(app.config.coverity_credentials) self.stream = app.config.coverity_credentials["stream"] self.snapshot = app.config.coverity_credentials.get("snapshot", "") # Login to Coverity and obtain stream information try: self.input_credentials(app.config.coverity_credentials) - report_info("Initialize a session on Coverity server... ", True) + LOGGER.info("Initialize a session on Coverity server... ") self.coverity_service = CoverityDefectService( app.config.coverity_credentials["hostname"], ) self.coverity_service.login( app.config.coverity_credentials["username"], app.config.coverity_credentials["password"] ) - report_info("done") - report_info("Verify the given stream name... ") + LOGGER.info("done") + LOGGER.info("Verify the given stream name... ") self.coverity_service.validate_stream(self.stream) - report_info("done") + LOGGER.info("done") if self.snapshot: - report_info("Verify the given snapshot ID and obtain all enabled checkers... ") + LOGGER.info("Verify the given snapshot ID and obtain all enabled checkers... ") self.snapshot = self.coverity_service.validate_snapshot(self.snapshot) - report_info("done") + LOGGER.info("done") else: self.snapshot = "last()" # Get all column keys - report_info("obtaining all column keys... ") + LOGGER.info("obtaining all column keys... ") self.coverity_service.retrieve_column_keys() - report_info("done") + LOGGER.info("done") # Get all checkers - report_info("obtaining all checkers... ") + LOGGER.info("obtaining all checkers... ") self.coverity_service.retrieve_checkers() - report_info("done") + LOGGER.info("done") except (URLError, HTTPError, Exception, ValueError) as error_info: # pylint: disable=broad-except if isinstance(error_info, EOFError): self.coverity_login_error_msg = "Coverity credentials are not configured." else: self.coverity_login_error_msg = str(error_info) - report_info("failed with: %s" % error_info) + LOGGER.info(f"failed with: {error_info}") self.coverity_login_error = True # ----------------------------------------------------------------------------- @@ -96,7 +109,7 @@ def process_coverity_nodes(self, app, doctree, fromdocname): for node in doctree.traverse(CoverityDefect): top_node = node.create_top_node("Failed to connect to Coverity Server") node.replace_self(top_node) - report_warning("Connection failed: %s" % self.coverity_login_error_msg, fromdocname) + LOGGER.warning(f"Connection failed: {self.coverity_login_error_msg}", location=fromdocname) return # Item matrix: @@ -106,20 +119,21 @@ def process_coverity_nodes(self, app, doctree, fromdocname): # Get items from server try: defects = self.get_filtered_defects(node) - if defects["totalRows"] == -1: - error_message = "There are no defects with the specified filters" - report_warning(error_message, fromdocname, lineno=node["line"]) - else: - report_info("building defects table and/or chart... ", True) - node.perform_replacement(defects, self, app, fromdocname) - report_info("done") - except (URLError, AttributeError, Exception) as err: # pylint: disable=broad-except + except URLError as err: error_message = f"failed to process coverity-list with {err!r}" - report_warning(error_message, fromdocname, lineno=node["line"]) + LOGGER.warning(error_message, location=(fromdocname, node["line"])) top_node = node.create_top_node(node["title"]) top_node += nodes.paragraph(text=error_message) node.replace_self(top_node) continue + else: + if defects["totalRows"] == -1: + error_message = "There are no defects with the specified filters" + LOGGER.warning(error_message, location=(fromdocname, node["line"])) + else: + LOGGER.info("building defects table and/or chart... ") + node.perform_replacement(defects, self, app, fromdocname) + LOGGER.info("done") # ----------------------------------------------------------------------------- # Helper functions of event handlers @@ -150,28 +164,29 @@ def get_filtered_defects(self, node): "rows": [list of dictionaries {"key": , "value": }] } """ - report_info("obtaining defects... ") + LOGGER.info("obtaining defects... ") column_names = set(node["col"]) if "chart_attribute" in node and node["chart_attribute"].upper() in node.column_map: column_names.add(node["chart_attribute"]) defects = self.coverity_service.get_defects(self.stream, node["filters"], column_names, self.snapshot) - report_info("%d received" % (defects["totalRows"])) + LOGGER.info("%d received" % (defects["totalRows"])) return defects # Extension setup def setup(app): """Extension setup""" + + # Set logging level with --verbose (-v) option of Sphinx, + # This option can be given up to three times to get more debug logging output. + LOGGER.setLevel(VERBOSITY_MAP[app.verbosity]) + # Create default configuration. Can be customized in conf.py app.add_config_value( "coverity_credentials", - { - "hostname": "scan.coverity.com", - "username": "reporter", - "password": "coverity", - "stream": "some_stream", - }, + {}, "env", + dict, ) app.add_config_value("TRACEABILITY_ITEM_ID_REGEX", r"([A-Z_]+-[A-Z0-9_]+)", "env") diff --git a/mlx/coverity/coverity_directives/coverity_defect_list.py b/mlx/coverity/coverity_directives/coverity_defect_list.py index 74f01b7..5ee6a8a 100644 --- a/mlx/coverity/coverity_directives/coverity_defect_list.py +++ b/mlx/coverity/coverity_directives/coverity_defect_list.py @@ -3,6 +3,7 @@ from hashlib import sha256 from os import environ, path from pathlib import Path +from sphinx.util.logging import getLogger from docutils import nodes from docutils.parsers.rst import Directive, directives @@ -12,9 +13,10 @@ mpl.use("Agg") import matplotlib.pyplot as plt -from ..coverity_logging import report_info, report_warning from ..coverity_item_element import ItemElement +LOGGER = getLogger("mlx.coverity") + def pct_wrapper(sizes): """Helper function for matplotlib which returns the percentage and the absolute size of the slice. @@ -88,7 +90,7 @@ def perform_replacement(self, defects, connector, app, fromdocname): try: self.fill_table_and_count_attributes(defects["rows"], self.coverity_service.columns, app, fromdocname) except AttributeError as err: - report_info("No issues matching your query or empty stream. %s" % err) + LOGGER.info(f"No issues matching your query or empty stream. {err}") top_node += nodes.paragraph(text="No issues matching your query or empty stream") # don't generate empty pie chart image self.replace_self(top_node) @@ -150,9 +152,9 @@ def initialize_labels(self, labels, docname): attr_values = label.split("+") for attr_val in attr_values: if attr_val in self.chart_labels: - report_warning( - "Attribute value '%s' should be unique in chart option." % attr_val, - docname, + LOGGER.warning( + f"Attribute value {attr_val!r} should be unique in chart option.", + location=docname, ) self.chart_labels[attr_val] = 0 if len(attr_values) > 1: diff --git a/mlx/coverity/coverity_item_element.py b/mlx/coverity/coverity_item_element.py index 314c8a1..5cc7361 100644 --- a/mlx/coverity/coverity_item_element.py +++ b/mlx/coverity/coverity_item_element.py @@ -5,8 +5,10 @@ from docutils import nodes from sphinx.errors import NoUri from urlextract import URLExtract +from sphinx.util.logging import getLogger -from .coverity_logging import report_warning + +LOGGER = getLogger("mlx.coverity") class ItemElement(nodes.General, nodes.Element): @@ -197,7 +199,10 @@ def make_internal_item_ref(app, fromdocname, item_id, cid): return None item_info = env.traceability_collection.get_item(item_id) if not item_info: - report_warning("CID %s: Could not find item ID '%s' in traceability collection." % (cid, item_id), fromdocname) + LOGGER.warning( + f"CID {cid}: Could not find item ID {item_id!r} in traceability collection.", + location=fromdocname + ) return None ref_node = nodes.reference("", "") ref_node["refdocname"] = item_info.docname diff --git a/mlx/coverity/coverity_logging.py b/mlx/coverity/coverity_logging.py deleted file mode 100644 index 31e67e9..0000000 --- a/mlx/coverity/coverity_logging.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Module to provide functions that accommodate logging.""" - -from sphinx.util.logging import getLogger -from logging import WARNING - -LOGGER = getLogger(__name__) -LOGGER.setLevel(WARNING) - - -def report_warning(msg, docname, lineno=None): - """Convenience function for logging a warning - - Args: - msg (str): Message of the warning - docname (str): Name of the document in which the error occurred - lineno (str): Line number in the document on which the error occurred - """ - if lineno is not None: - LOGGER.warning(msg, location=(docname, lineno)) - else: - LOGGER.warning(msg, location=docname) - - -def report_info(msg, nonl=False): - """Convenience function for information printing - - Args: - msg (str): Message of the warning - nonl (bool): True when no new line at end - """ - LOGGER.info(msg, nonl=nonl) diff --git a/mlx/coverity/coverity_services.py b/mlx/coverity/coverity_services.py index f526aac..d3d3941 100644 --- a/mlx/coverity/coverity_services.py +++ b/mlx/coverity/coverity_services.py @@ -9,7 +9,8 @@ import requests from sphinx.util.logging import getLogger -from mlx.coverity import report_info, report_warning + +LOGGER = getLogger("mlx.coverity") # Coverity built in Impact statuses IMPACT_LIST = ["High", "Medium", "Low"] @@ -52,7 +53,6 @@ def __init__(self, hostname): self._api_endpoint = f"https://{hostname}/api/{self.version}" self._checkers = [] self._columns = {} - self.logger = getLogger("mlx.coverity_logging") @property def base_url(self): @@ -102,7 +102,7 @@ def column_keys(self, column_names): elif column_name_lower in self.columns: column_keys.add(self.columns[column_name_lower]) else: - self.logger.warning(f"Invalid column name {column_name!r}") + LOGGER.warning(f"Invalid column name {column_name!r}") return column_keys def login(self, username, password): @@ -136,10 +136,10 @@ def validate_snapshot(self, snapshot): url = f"{self.api_endpoint}/snapshots/{snapshot}" response = self.session.get(url) if response.ok: - report_info(f"Snapshot ID {snapshot} is valid") + LOGGER.info(f"Snapshot ID {snapshot} is valid") valid_snapshot = snapshot else: - report_warning(f"No snapshot found for ID {snapshot}; Continue with using the latest snapshot.", "") + LOGGER.warning(f"No snapshot found for ID {snapshot}, using the latest snapshot instead") valid_snapshot = "last()" return valid_snapshot @@ -219,7 +219,7 @@ def _request(self, url, data=None): err_msg = response.json()["message"] except (requests.exceptions.JSONDecodeError, KeyError): err_msg = response.content.decode() - self.logger.error(err_msg) + LOGGER.error(err_msg) return response.raise_for_status() def assemble_query_filter(self, column_name, filter_values, matcher_type): @@ -247,7 +247,7 @@ def assemble_query_filter(self, column_name, filter_values, matcher_type): matchers.append(matcher) if column_name not in self.columns: - self.logger.warning(f"Invalid column name {column_name!r}; Retrieve column keys first.") + LOGGER.warning(f"Invalid column name {column_name!r}; Retrieve column keys first.") return { "columnKey": self.columns[column_name], @@ -278,7 +278,7 @@ def get_defects(self, stream, filters, column_names, snapshot): "rows": list of [list of dictionaries {"key": , "value": }] } """ - report_info(f"Querying Coverity for defects in stream [{stream}] ...") + LOGGER.info(f"Querying Coverity for defects in stream [{stream}] ...") query_filters = [ { "columnKey": "streams", @@ -325,7 +325,7 @@ def get_defects(self, stream, filters, column_names, snapshot): } defects_data = self.retrieve_issues(data) - report_info("done") + LOGGER.info("done") return defects_data @@ -342,11 +342,11 @@ def handle_attribute_filter(self, attribute_values, name, valid_attributes, allo Returns: set[str]: The attributes values to query with """ - report_info(f"Using {name!r} filter [{attribute_values}]") + LOGGER.info(f"Using {name!r} filter [{attribute_values}]") filter_values = set() for field in attribute_values.split(","): if not valid_attributes or field in valid_attributes: - report_info(f"Classification [{field}] is valid") + LOGGER.info(f"Classification [{field}] is valid") filter_values.add(field) elif allow_regex: pattern = re.compile(field) @@ -354,7 +354,7 @@ def handle_attribute_filter(self, attribute_values, name, valid_attributes, allo if pattern.search(element): filter_values.add(element) else: - self.logger.error(f"Invalid {name} filter: {field}") + LOGGER.error(f"Invalid {name} filter: {field}") return filter_values def handle_component_filter(self, attribute_values): @@ -366,7 +366,7 @@ def handle_component_filter(self, attribute_values): Returns: list[str]: The list of attributes """ - report_info(f"Using 'Component' filter [{attribute_values}]") + LOGGER.info(f"Using 'Component' filter [{attribute_values}]") parser = csv.reader([attribute_values]) filter_values = [] for fields in parser: