diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 72e96fc7..e00424d4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,7 +7,7 @@ repos: files: "^(compliance|test)" stages: [commit] - repo: https://gitlab.com/pycqa/flake8 - rev: 3.9.0 + rev: 3.9.1 hooks: - id: flake8 args: [ diff --git a/CHANGES.md b/CHANGES.md index 0c339b29..3ea49291 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,8 @@ +# [1.16.0](https://github.com/ComplianceAsCode/auditree-framework/releases/tag/v1.16.0) + +- [ADDED] Locker get_empty_evidences method added to return all empty evidence paths. +- [ADDED] Evidence base class has override-able is_empty property. + # [1.15.0](https://github.com/ComplianceAsCode/auditree-framework/releases/tag/v1.15.0) - [FIXED] The evidences context manager now raises an exception when no evidence is found. diff --git a/compliance/__init__.py b/compliance/__init__.py index 9625ad6a..58cab640 100644 --- a/compliance/__init__.py +++ b/compliance/__init__.py @@ -14,4 +14,4 @@ # limitations under the License. """Compliance automation package.""" -__version__ = '1.15.0' +__version__ = '1.16.0' diff --git a/compliance/evidence.py b/compliance/evidence.py index f804552b..d57f3a68 100644 --- a/compliance/evidence.py +++ b/compliance/evidence.py @@ -129,6 +129,13 @@ def content_as_json(self): self._content_as_json = json.loads(self.content) return self._content_as_json + @property + def is_empty(self): + return not self.content or not self.content.strip() or ( + self.extension == 'json' and self.content_as_json != 0 + and not self.content_as_json + ) + def set_content(self, str_content): self._content = str_content if self.extension == 'json': diff --git a/compliance/locker.py b/compliance/locker.py index 67528ab8..77fdea4a 100644 --- a/compliance/locker.py +++ b/compliance/locker.py @@ -22,6 +22,7 @@ import tempfile import time from datetime import datetime as dt, timedelta +from pathlib import PurePath from threading import Lock from urllib.parse import urlparse @@ -254,6 +255,8 @@ def index(self, evidence, checks=None, evidence_used=None): 'ttl': evidence.ttl, 'description': evidence.description } + if evidence.is_empty: + metadata[evidence.name]['empty'] = True tombstones = None if getattr(evidence, 'is_partitioned', False): unpartitioned = self.get_file(evidence.path) @@ -692,14 +695,14 @@ def get_reports_metadata(self): def get_abandoned_evidences(self, threshold=None): """ - Provide a list of evidence where the update ``threshold`` has passed. + Provide a set of evidence where the update ``threshold`` has passed. :param int threshold: the time in seconds after TTL expires that evidence can remain un-updated before it is considered abandoned. The abandoned evidence threshold defaults to 30 days if none is provided. - :returns: a list of abandoned evidence files. + :returns: a set of abandoned evidence file relative paths. """ abandoned_evidence = [] tree = self.repo.head.commit.tree @@ -713,6 +716,26 @@ def get_abandoned_evidences(self, threshold=None): abandoned_evidence.append(f.path) return set(abandoned_evidence) + def get_empty_evidences(self): + """ + Provide a list of evidence paths to empty evidence files. + + Evidence content is deemed empty based on an evidence object's + is_empty property. This information is stored in evidence metadata. + + :returns: a list of empty evidence file relative paths. + """ + empty_evidence = [] + tree = self.repo.head.commit.tree + for idx_file in [f for f in tree.traverse() if is_index_file(f.path)]: + metadata = json.loads(idx_file.data_stream.read()) + for ev_name, ev_meta in metadata.items(): + if ev_meta.get('empty', False): + empty_evidence.append( + str(PurePath(PurePath(idx_file.path).parent, ev_name)) + ) + return empty_evidence + def delete_repo_locally(self): """Remove the local git repository.""" try: diff --git a/test/t_compliance/t_locker/test_locker.py b/test/t_compliance/t_locker/test_locker.py index 06c48b2d..11c2f4e6 100644 --- a/test/t_compliance/t_locker/test_locker.py +++ b/test/t_compliance/t_locker/test_locker.py @@ -243,6 +243,44 @@ def test_locker_as_expected(self): locker.get_abandoned_evidences() ) + def test_empty_evidence(self): + """Test that all empty evidence is identified.""" + with Locker(name=REPO_DIR) as locker: + populated = RawEvidence( + 'populated.json', 'test_category', DAY, 'Populated evidence' + ) + populated.set_content('{"key": "value1"}') + locker.add_evidence(populated) + populated0 = RawEvidence( + 'populated0.json', 'test_category', DAY, 'Populated with zero' + ) + populated0.set_content('0') + locker.add_evidence(populated0) + white_space = RawEvidence( + 'white_space.txt', 'test_category', DAY, 'Whitespace only' + ) + white_space.set_content(' ') + locker.add_evidence(white_space) + empty_dict = RawEvidence( + 'empty_dict.json', 'test_category', DAY, 'Empty dictionary' + ) + empty_dict.set_content('{}') + locker.add_evidence(empty_dict) + empty_list = RawEvidence( + 'empty_list.json', 'test_category', DAY, 'Empty list' + ) + empty_list.set_content('[]') + locker.add_evidence(empty_list) + locker.checkin() + self.assertCountEqual( + locker.get_empty_evidences(), + [ + 'raw/test_category/white_space.txt', + 'raw/test_category/empty_dict.json', + 'raw/test_category/empty_list.json' + ] + ) + def test_add_partitioned_evidence(self): """Test that partitioned evidence is added to locker as expected.""" with Locker(name=REPO_DIR) as locker: