Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor coverity_services #47

Merged
merged 2 commits into from
Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .codeclimate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ plugins:
threshold: "C"
sonar-python:
enabled: true
exclude_patterns:
- example/conf.py
config:
tests_patterns:
- tests/**
Expand Down
81 changes: 44 additions & 37 deletions mlx/coverity_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ def login(self, username, password):
security.tokens.append(token)
self.client.set_options(wsse=security)

def validate_presence(self, url, service_name):
'''Initializes the client attribute while validating the presence of the service'''
try:
self.client = Client(url)
logging.info("Validated presence of %s [%s]", service_name, url)
except URLError:
self.client = None
logging.critical("No such %s [%s]", service_name, url)
raise


class CoverityConfigurationService(Service):
'''
Expand All @@ -150,13 +160,7 @@ def __init__(self, transport, hostname, port, ws_version=DEFAULT_WS_VERSION):
self.checkers = None
url = self.get_ws_url('configurationservice')
logging.getLogger('suds.client').setLevel(logging.CRITICAL)
try:
self.client = Client(url)
logging.info("Validated presence of Coverity Configuration Service [%s]", url)
except URLError:
self.client = None
logging.critical("No such Coverity Configuration Service [%s]", url)
raise
self.validate_presence(url, 'Coverity Configuration Service')

def login(self, username, password):
'''Login to Coverity Configuration service using given username and password'''
Expand Down Expand Up @@ -279,13 +283,7 @@ def __init__(self, config_service):
self.filters = ""
# logging.getLogger('suds.client').setLevel(logging.DEBUG)
url = self.get_ws_url('defectservice')
try:
self.client = Client(url)
logging.info("Validated presence of Coverity Defect Service [%s]", url)
except URLError:
self.client = None
logging.critical("No such Coverity Defect Service [%s]", url)
raise
self.validate_presence(url, 'Coverity Defect Service')

def get_defects(self, project, stream, filters, custom=None):
""" Gets a list of defects for given stream, with some query criteria.
Expand Down Expand Up @@ -318,51 +316,42 @@ def get_defects(self, project, stream, filters, custom=None):
# apply any filter on checker names
if filters['checker']:
self.config_service.get_checkers()
self.handle_filter_attribute(filters['checker'],
self.handle_attribute_filter(filters['checker'],
'Checker',
self.config_service.checkers,
filter_spec.checkerList,
allow_regex=True)

# apply any filter on impact status
if filters['impact']:
self.handle_filter_attribute(filters['impact'], 'Impact', IMPACT_LIST, filter_spec.impactNameList)
self.handle_attribute_filter(filters['impact'], 'Impact', IMPACT_LIST, filter_spec.impactNameList)

# apply any filter on issue kind
if filters['kind']:
self.handle_filter_attribute(filters['kind'], 'Kind', KIND_LIST, filter_spec.issueKindList)
self.handle_attribute_filter(filters['kind'], 'Kind', KIND_LIST, filter_spec.issueKindList)

# apply any filter on classification
if filters['classification']:
self.handle_filter_attribute(filters['classification'],
self.handle_attribute_filter(filters['classification'],
'Classification',
CLASSIFICATION_LIST,
filter_spec.classificationNameList)

# apply any filter on action
if filters['action']:
self.handle_filter_attribute(filters['action'], 'Action', ACTION_LIST, filter_spec.actionNameList)
self.handle_attribute_filter(filters['action'], 'Action', ACTION_LIST, filter_spec.actionNameList)

# apply any filter on Components
if filters['component']:
logging.info('Using Component filter [%s]', filters['component'])
parser = csv.reader([filters['component']])

for fields in parser:
for _, field in enumerate(fields):
field = field.strip()
component_id = self.client.factory.create('componentIdDataObj')
component_id.name = field
filter_spec.componentIdList.append(component_id)
self.filters += ("<Components(%s)> " % (filters['component']))
self.handle_component_filter(filters['component'], filter_spec)

# apply any filter on CWE values
if filters['cwe']:
self.handle_filter_attribute(filters['cwe'], 'CWE', None, filter_spec.cweList)
self.handle_attribute_filter(filters['cwe'], 'CWE', None, filter_spec.cweList)

# apply any filter on CID values
if filters['cid']:
self.handle_filter_attribute(filters['cid'], 'CID', None, filter_spec.cidList)
self.handle_attribute_filter(filters['cid'], 'CID', None, filter_spec.cidList)

# if a special custom attribute value requirement
if custom:
Expand All @@ -387,19 +376,37 @@ def get_defects(self, project, stream, filters, custom=None):
return self.client.service.getMergedDefectsForSnapshotScope(project_id, filter_spec,
page_spec, snapshot_scope)

def handle_filter_attribute(self, attribute, name, *args, **kwargs):
def handle_attribute_filter(self, attribute_values, name, *args, **kwargs):
""" Applies any filter on an attribute's values.

Args:
attribute (str): A CSV list of attribute values to query.
attribute_values (str): A CSV list of attribute values to query.
name (str): String representation of the attribute.
"""
logging.info('Using %s filter [%s]', name, attribute)
validated = self.config_service.add_filter_rqt(name, attribute, *args, **kwargs)
logging.info('Using %s filter [%s]', name, attribute_values)
validated = self.config_service.add_filter_rqt(name, attribute_values, *args, **kwargs)
logging.info('Resolves to [%s]', validated)
if validated:
self.filters += ("<%s(%s)> " % (name, validated))

def handle_component_filter(self, attribute_values, filter_spec):
""" Applies any filter on the component attribute's values.

Args:
attribute_values (str): A CSV list of attribute values to query.
filter_spec (sudsobject.Factory): Object to store filter attributes.
"""
logging.info('Using Component filter [%s]', attribute_values)
parser = csv.reader([attribute_values])

for fields in parser:
for _, field in enumerate(fields):
field = field.strip()
component_id = self.client.factory.create('componentIdDataObj')
component_id.name = field
filter_spec.componentIdList.append(component_id)
self.filters += ("<Components(%s)> " % (attribute_values))

def handle_custom_filter_attribute(self, custom, filter_spec):
""" Handles a custom attribute definition, and adds it to the filter spec if it's valid.

Expand Down Expand Up @@ -433,9 +440,9 @@ def handle_custom_filter_attribute(self, custom, filter_spec):
self.filters += ("<Attrs(%s)> " % custom)

def _append_multiple_values(self, values, filter_map):
# if there are multiple values delimited with comma
'''Append multiple values if there are multiple values delimited with comma'''
for value_fields in csv.reader([values], delimiter=','):
for _, value in enumerate(value_fields):
for value in value_fields:
logging.info(" [%s]", value)

attribute_value_id = self.client.factory.create('attributeValueIdDataObj')
Expand Down