Skip to content

Commit

Permalink
fix: minor sles provider fixes (#51)
Browse files Browse the repository at this point in the history
* fix: use the compressed xml files for the sles provider
* chore: adjust type annotations
* use iterparse from defusedxml
* fix: actually skip fixed-in when name or version not known
* chore: remove dead code
---------

Signed-off-by: Weston Steimel <weston.steimel@anchore.com>
  • Loading branch information
westonsteimel authored Jan 30, 2023
1 parent 1af3e39 commit ac953b1
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 61 deletions.
19 changes: 12 additions & 7 deletions src/vunnel/providers/sles/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
iter_parse_vulnerability_file,
)
from vunnel.utils.vulnerability import CVSS, CVSSBaseMetrics, FixedIn, Vulnerability
from vunnel.workspace import Workspace

namespace = "sles"

Expand All @@ -44,15 +45,17 @@


class Parser:
__oval_url__ = "https://ftp.suse.com/pub/projects/security/oval/suse.linux.enterprise.server.{}.xml"
__oval_file_name__ = "suse-linux-enterprise-server-{}.xml"
__oval_url__ = "https://ftp.suse.com/pub/projects/security/oval/suse.linux.enterprise.server.{}.xml.gz"
__oval_file_name__ = "suse-linux-enterprise-server-{}.xml.gz"
__oval_dir_path__ = "oval"
__source_dir_path__ = "source"

# this is pretty odd, but there are classmethods that need logging
logger = logging.getLogger("sles-parser")

def __init__(self, workspace, allow_versions, download_timeout=125, logger=None):
def __init__(
self, workspace: Workspace, allow_versions: list[str], download_timeout: int = 125, logger: logging.Logger | None = None
):
self.oval_dir_path = os.path.join(workspace.input_path, self.__source_dir_path__, self.__oval_dir_path__)
self.allow_versions = allow_versions
self.download_timeout = download_timeout
Expand All @@ -65,8 +68,7 @@ def __init__(self, workspace, allow_versions, download_timeout=125, logger=None)
Parser.logger = logger

@utils.retry_with_backoff()
def _download(self, major_version: str, skip_if_exists: bool = False):

def _download(self, major_version: str, skip_if_exists: bool = False) -> str:
if not os.path.exists(self.oval_dir_path):
self.logger.debug(f"creating workspace for OVAL source data at {self.oval_dir_path}")
os.makedirs(self.oval_dir_path)
Expand Down Expand Up @@ -100,7 +102,9 @@ def _download(self, major_version: str, skip_if_exists: bool = False):

with open(oval_file_path, "wb") as fp:
for chunk in r.iter_content(chunk_size=1024):
fp.write(chunk)
if chunk:
fp.write(chunk)
fp.flush()

return oval_file_path

Expand Down Expand Up @@ -289,6 +293,7 @@ def _transform_oval_vulnerabilities(cls, major_version: str, parsed_dict: dict)
"package name and or version invalid, skipping fixed-in for %s",
test_id,
)
continue

fixes.append(
FixedIn(
Expand Down Expand Up @@ -323,7 +328,7 @@ def _transform_oval_vulnerabilities(cls, major_version: str, parsed_dict: dict)

return results

def get(self, skip_if_exists=False):
def get(self, skip_if_exists: bool = False):
parser_factory = OVALParserFactory(
parsers=[
SLESVulnerabilityParser,
Expand Down
47 changes: 27 additions & 20 deletions src/vunnel/utils/oval_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

import copy
import gzip
import logging
import os
import re
Expand Down Expand Up @@ -65,26 +66,32 @@ def parse(dest_file: str, config: Config):

if os.path.exists(dest_file):
processing = False
for event, element in ET.iterparse(dest_file, events=("start", "end")):
# gather definition
if event == "start" and re.search(config.tag_pattern, element.tag).group(1) == "definition":
processing = True
elif event == "end" and re.search(config.tag_pattern, element.tag).group(1) == "definition":
try:
_process_definition(element, vuln_dict, config)
except:
logger.exception("Error parsing oval record. Logging error and continuing")
finally:
processing = False

if not processing and event == "end":
# print('Clearing element: {} post event: {}'.format(re.search(tag_pattern, element.tag).group(1), event))
element.clear()

# bail after definitions
if event == "end" and re.search(config.tag_pattern, element.tag).group(1) == "definitions":
# print('Stopped parsing')
break
opener = open

if dest_file.endswith(".gz"):
opener = gzip.open

with opener(dest_file, "rb") as f:
for event, element in ET.iterparse(dest_file, events=("start", "end")):
# gather definition
if event == "start" and re.search(config.tag_pattern, element.tag).group(1) == "definition":
processing = True
elif event == "end" and re.search(config.tag_pattern, element.tag).group(1) == "definition":
try:
_process_definition(element, vuln_dict, config)
except:
logger.exception("Error parsing oval record. Logging error and continuing")
finally:
processing = False

if not processing and event == "end":
# print('Clearing element: {} post event: {}'.format(re.search(tag_pattern, element.tag).group(1), event))
element.clear()

# bail after definitions
if event == "end" and re.search(config.tag_pattern, element.tag).group(1) == "definitions":
# print('Stopped parsing')
break
else:
logger.warning(f"{dest_file} not found, returning empty results")

Expand Down
73 changes: 39 additions & 34 deletions src/vunnel/utils/oval_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from __future__ import annotations

import enum
import gzip
import logging
import os
import re
Expand All @@ -16,6 +17,8 @@
from collections import defaultdict
from dataclasses import dataclass

from defusedxml.ElementTree import iterparse


class OVALElementEnum(enum.Enum):
"""
Expand Down Expand Up @@ -402,7 +405,7 @@ def iter_parse_vulnerability_file(
oval_file_path: str,
parser_config: OVALParserConfig,
parser_factory: OVALParserFactory,
) -> dict:
) -> defaultdict[enum.Enum, dict[str, Parsed]]:
"""
Starting point for parsing a vulnerability class OVAL file content.
Iteratively parses the file using the parsers supplied by the input factory.
Expand All @@ -412,43 +415,45 @@ def iter_parse_vulnerability_file(
logger = logging.getLogger("oval-v2-parser")

logger.info(f"parsing {oval_file_path}")
parsed_dict = defaultdict(dict)
parsed_dict: defaultdict[enum.Enum, dict[str, Parsed]] = defaultdict(dict)

if os.path.exists(oval_file_path):
ingress = False
for event, xml_element in ET.iterparse(oval_file_path, events=("start", "end")):
# gather definition
if event == "start" and parser_factory.get_oval_element(xml_element, parser_config):
ingress = True
elif event == "end":
# is this an interesting oval element?
oval_element = parser_factory.get_oval_element(xml_element, parser_config)

# is the interesting oval element in ingress?
if oval_element and ingress:
# yes and yes, halt ingress and parse the element
ingress = False
parser = parser_factory.get_parser(oval_element)
if parser:
result = parser.parse(xml_element, parser_config)
if result:
parsed_dict[oval_element][result.identity] = result
opener = open

if oval_file_path.endswith(".gz"):
opener = gzip.open

with opener(oval_file_path, "rb") as f:
for event, xml_element in iterparse(f, events=("start", "end")):
# gather definition
if event == "start" and parser_factory.get_oval_element(xml_element, parser_config):
ingress = True
elif event == "end":
# is this an interesting oval element?
oval_element = parser_factory.get_oval_element(xml_element, parser_config)

# is the interesting oval element in ingress?
if oval_element and ingress:
# yes and yes, halt ingress and parse the element
ingress = False
parser = parser_factory.get_parser(oval_element)
if parser:
result = parser.parse(xml_element, parser_config)
if result:
parsed_dict[oval_element][result.identity] = result
else:
logger.warning("unable to parse %s element", repr(oval_element.value))
else:
logger.warn("unable to parse %s element", repr(oval_element.value))
else:
logger.warn(
"no parser found for oval element %s, skipping",
oval_element,
)
# else:
# # this marks the end of an xml element but does it's not an interesting oval element or may be not the whole element yet
# pass

# clear the element if doesn't need to be processed or done processing
if not ingress and event == "end":
xml_element.clear()

logger.warning(
"no parser found for oval element %s, skipping",
oval_element,
)

# clear the element if doesn't need to be processed or done processing
if not ingress and event == "end":
xml_element.clear()
else:
logger.warn("{} not found, returning empty results".format(oval_file_path))
logger.warning(f"{oval_file_path} not found, returning empty results")

return parsed_dict

0 comments on commit ac953b1

Please sign in to comment.